update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / test / utest_image_upload.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import asyncio
22 import contextlib
23 import io
24 import logging
25 import os
26 import sys
27 import tempfile
28 import time
29 import unittest
30 import uuid
31 import xmlrunner
32
33 from rift.mano import cloud
34 from rift.tasklets.rwimagemgr import upload
35 from rift.package import checksums
36 from rift.test.dts import async_test
37 from rift.mano.utils.project import ManoProject, DEFAULT_PROJECT
38 import rw_status
39
40 import gi
41 gi.require_version('RwCal', '1.0')
42 gi.require_version('RwcalYang', '1.0')
43 gi.require_version('RwCloudYang', '1.0')
44 gi.require_version('RwLog', '1.0')
45 gi.require_version('RwTypes', '1.0')
46 from gi.repository import (
47 RwCal,
48 RwCloudYang,
49 RwLog,
50 RwTypes,
51 RwcalYang,
52 )
53
54 rwstatus = rw_status.rwstatus_from_exc_map({
55 IndexError: RwTypes.RwStatus.NOTFOUND,
56 KeyError: RwTypes.RwStatus.NOTFOUND,
57 })
58
59
60 class CreateImageMock(object):
61 def __init__(self, log):
62 self._log = log
63 self.image_name = None
64 self.image_checksum = None
65
66 self.do_fail = False
67 self.do_read_slow = False
68
69 self._image_msgs = []
70
71 def add_existing_image(self, image_msg):
72 self._log.debug("Appending existing image msg: %s", image_msg)
73 self._image_msgs.append(image_msg)
74
75 @rwstatus
76 def do_create_image(self, _, image):
77 if self.do_fail:
78 self._log.debug("Simulating failure")
79 raise ValueError("FAILED")
80
81 if not image.has_field("fileno"):
82 raise ValueError("Image must have fileno")
83
84 self.image_name = image.name
85
86 # Create a duplicate file descriptor to allow this code to own
87 # its own descritor (and thus close it) and allow the client to
88 # own and close its own descriptor.
89 new_fileno = os.dup(image.fileno)
90 with os.fdopen(new_fileno, 'rb') as hdl:
91 bytes_hdl = io.BytesIO()
92 if self.do_read_slow:
93 self._log.debug("slow reading from mock cal")
94 try:
95 num_bytes = 0
96 while True:
97 d = os.read(new_fileno, 1024)
98 num_bytes += len(d)
99 bytes_hdl.write(d)
100 if not d:
101 self._log.debug("read %s bytes", num_bytes)
102 return
103
104 time.sleep(.05)
105
106 except Exception as e:
107 self._log.warning("caught exception when reading: %s",
108 str(e))
109 raise
110
111 else:
112 bytes_hdl.write(hdl.read())
113
114 bytes_hdl.seek(0)
115 self.image_checksum = checksums.checksum(bytes_hdl)
116 bytes_hdl.close()
117
118 return str(uuid.uuid4())
119
120 @rwstatus
121 def do_get_image_list(self, account):
122 boxed_image_list = RwcalYang.YangData_RwProject_Project_VimResources()
123 for msg in self._image_msgs:
124 boxed_image_list.imageinfo_list.append(msg)
125
126 return boxed_image_list
127
128
129 def create_random_image_file():
130 with open("/dev/urandom", "rb") as rand_hdl:
131 file_hdl = tempfile.NamedTemporaryFile("r+b")
132 file_hdl.write(rand_hdl.read(1 * 1024 * 1024))
133 file_hdl.flush()
134 file_hdl.seek(0)
135 return file_hdl
136
137
138 def get_file_hdl_gen(file_hdl):
139 while True:
140 try:
141 d = file_hdl.read(64)
142 except ValueError:
143 return
144
145 if not d:
146 return
147
148 yield d
149
150
151 def get_image_checksum(image_hdl):
152 image_checksum = checksums.checksum(image_hdl)
153 image_hdl.seek(0)
154 return image_checksum
155
156
157 def create_image_info(image_name, image_checksum):
158 image = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList()
159 image.name = image_name
160 image.checksum = image_checksum
161 image.disk_format = os.path.splitext(image_name)[1][1:]
162 image.container_format = "bare"
163
164 return image
165
166
167 class UploadTaskMixin(object):
168 def __init__(self, log, loop):
169 self._log = log
170 self._loop = loop
171
172 def create_image_hdl(self):
173 image_hdl = create_random_image_file()
174
175 return image_hdl
176
177 @contextlib.contextmanager
178 def create_upload_task(self, account, image_name="test.qcow2",
179 image_checksum=None, image_info=None):
180
181 with self.create_image_hdl() as image_hdl:
182
183 image_checksum = get_image_checksum(image_hdl) \
184 if image_checksum is None else image_checksum
185
186 image_info = create_image_info(image_name, image_checksum) \
187 if image_info is None else image_info
188
189 iter_hdl = get_file_hdl_gen(image_hdl)
190 pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, iter_hdl)
191
192 upload_task = upload.AccountImageUploadTask(
193 self._log, self._loop, account, image_info, pipe_gen.read_hdl,
194 write_canceller=pipe_gen
195 )
196 pipe_gen.start()
197
198 yield upload_task
199
200
201 class ImageMockMixin(object):
202 ACCOUNT_MSG = RwCloudYang.YangData_RwProject_Project_Cloud_Account(
203 name="mock",
204 account_type="mock",
205 )
206
207 def __init__(self, log):
208 self._log = log
209 self._account = cloud.CloudAccount(
210 self._log,
211 RwLog.Ctx.new(__file__), ImageMockMixin.ACCOUNT_MSG
212 )
213
214 self._create_image_mock = CreateImageMock(self._log)
215
216 # Mock the create_image call
217 self._account.cal.create_image = self._create_image_mock.do_create_image
218 self._account.cal.get_image_list = self._create_image_mock.do_get_image_list
219
220 @property
221 def account(self):
222 return self._account
223
224 @property
225 def image_mock(self):
226 return self._create_image_mock
227
228
229 class TestImageUploadTask(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
230 def __init__(self, *args, **kwargs):
231 self._loop = asyncio.get_event_loop()
232 self._log = logging.getLogger(__file__)
233
234 ImageMockMixin.__init__(self, self._log)
235 UploadTaskMixin.__init__(self, self._log, self._loop)
236 unittest.TestCase.__init__(self, *args, **kwargs)
237
238 @async_test
239 def test_upload_image_task(self):
240 with self.create_upload_task(self.account) as upload_task:
241 yield from upload_task.start()
242
243 self.assertIn("QUEUED", upload_task.state_stack)
244 self.assertIn("CHECK_IMAGE_EXISTS", upload_task.state_stack)
245 self.assertIn("UPLOADING", upload_task.state_stack)
246 self.assertIn("COMPLETED", upload_task.state_stack)
247
248 self.assertEqual("COMPLETED", upload_task.state)
249
250 self.assertEqual(self.image_mock.image_name, upload_task.image_name)
251 self.assertEqual(self.image_mock.image_checksum, upload_task.image_checksum)
252
253 task_pb_msg = upload_task.pb_msg
254 self.assertEqual(upload_task.image_name, task_pb_msg.image_name)
255
256 # TODO: Fix this
257 @unittest.skip("Causes coredump in OSM")
258 @async_test
259 def test_cancel_image_task(self):
260 @asyncio.coroutine
261 def wait_for_task_state(upload_task, state, timeout=10):
262 start_time = time.time()
263 while (time.time() - start_time) < timeout:
264 if upload_task.state == state:
265 return
266
267 yield from asyncio.sleep(.01)
268
269 raise asyncio.TimeoutError()
270
271 self.image_mock.do_read_slow = True
272
273 with self.create_upload_task(self.account) as upload_task:
274 upload_task.start()
275 yield from wait_for_task_state(upload_task, "UPLOADING")
276 upload_task.stop()
277 self.assertEqual("CANCELLING", upload_task.state)
278 yield from wait_for_task_state(upload_task, "CANCELLED")
279
280 @async_test
281 def test_create_image_failed(self):
282 self.image_mock.do_fail = True
283
284 with self.create_upload_task(self.account) as upload_task:
285 yield from upload_task.start()
286
287 self.assertEqual("FAILED", upload_task.state)
288
289 @async_test
290 def test_create_image_name_and_checksum_exists(self):
291 with self.create_upload_task(self.account) as upload_task:
292 image_entry = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList(
293 id="asdf",
294 name=upload_task.image_name,
295 checksum=upload_task.image_checksum
296 )
297 self.image_mock.add_existing_image(image_entry)
298
299 yield from upload_task.start()
300
301 # No image should have been uploaded, since the name and checksum
302 self.assertEqual(self.image_mock.image_checksum, None)
303
304 self.assertEqual("COMPLETED", upload_task.state)
305 self.assertTrue("UPLOADING" not in upload_task.state_stack)
306
307
308 class TestUploadJob(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
309 def __init__(self, *args, **kwargs):
310 self._loop = asyncio.get_event_loop()
311 self._log = logging.getLogger(__file__)
312
313 ImageMockMixin.__init__(self, self._log)
314 UploadTaskMixin.__init__(self, self._log, self._loop)
315 unittest.TestCase.__init__(self, *args, **kwargs)
316
317 @async_test
318 def test_single_task_upload_job(self):
319 with self.create_upload_task(self.account) as upload_task:
320 job = upload.ImageUploadJob(self._log, self._loop, [upload_task])
321 self.assertEqual("QUEUED", job.state)
322 yield from job.start()
323
324 self.assertIn("QUEUED", job.state_stack)
325 self.assertIn("IN_PROGRESS", job.state_stack)
326 self.assertIn("COMPLETED", job.state_stack)
327
328 self.assertEqual("COMPLETED", job.state)
329
330 job_pb_msg = job.pb_msg
331 self.assertEqual("COMPLETED", job_pb_msg.status)
332
333 @async_test
334 def test_multiple_tasks_upload_job(self):
335 with self.create_upload_task(self.account) as upload_task1:
336 with self.create_upload_task(self.account) as upload_task2:
337 job = upload.ImageUploadJob(
338 self._log, self._loop, [upload_task1, upload_task2])
339 yield from job.start()
340
341 self.assertEqual("COMPLETED", job.state)
342
343 @async_test
344 def test_failed_task_in_job(self):
345 self.image_mock.do_fail = True
346
347 with self.create_upload_task(self.account) as upload_task:
348 job = upload.ImageUploadJob(
349 self._log, self._loop, [upload_task])
350 yield from job.start()
351
352 self.assertEqual("FAILED", job.state)
353
354 # TODO: Fix this
355 @unittest.skip("Causes coredump in OSM")
356 @async_test
357 def test_cancel_job(self):
358 @asyncio.coroutine
359 def wait_for_job_state(upload_job, state, timeout=10):
360 start_time = time.time()
361 while (time.time() - start_time) < timeout:
362 if upload_job.state == state:
363 return
364
365 yield from asyncio.sleep(.01)
366
367 raise asyncio.TimeoutError()
368
369 self.image_mock.do_read_slow = True
370
371 with self.create_upload_task(self.account) as upload_task:
372 job = upload.ImageUploadJob(
373 self._log, self._loop, [upload_task])
374 job.start()
375 yield from wait_for_job_state(job, "IN_PROGRESS")
376 job.stop()
377 self.assertEqual("CANCELLING", job.state)
378 yield from wait_for_job_state(job, "CANCELLED")
379
380 self.assertEqual("CANCELLED", job.state)
381
382
383 class TestUploadJobController(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
384 def __init__(self, *args, **kwargs):
385 self._loop = asyncio.get_event_loop()
386 self._log = logging.getLogger(__file__)
387 self._project = ManoProject(self._log, name=DEFAULT_PROJECT)
388 self._project._loop = self._loop
389 ImageMockMixin.__init__(self, self._log)
390 unittest.TestCase.__init__(self, *args, **kwargs)
391
392 @async_test
393 def test_controller_single_task_job(self):
394 controller = upload.ImageUploadJobController(self._project)
395
396 with self.create_upload_task(self.account) as upload_task:
397 job_id = controller.create_job([upload_task])
398 self.assertEqual(len(controller.active_jobs), 1)
399 self.assertEqual(len(controller.completed_jobs), 0)
400
401 job = controller.get_job(job_id)
402 yield from job.wait()
403
404 self.assertEqual(len(controller.active_jobs), 0)
405 self.assertEqual(len(controller.completed_jobs), 1)
406
407 upload_jobs_pb_msg = controller.pb_msg
408 self.assertEqual(len(upload_jobs_pb_msg.job), 1)
409
410 @async_test
411 def test_controller_multi_task_job(self):
412 controller = upload.ImageUploadJobController(self._project)
413
414 with self.create_upload_task(self.account) as upload_task1:
415 with self.create_upload_task(self.account) as upload_task2:
416 job_id = controller.create_job([upload_task1, upload_task2])
417 self.assertEqual(len(controller.active_jobs), 1)
418 self.assertEqual(len(controller.completed_jobs), 0)
419
420 job = controller.get_job(job_id)
421 yield from job.wait()
422 self.assertEqual(len(controller.active_jobs), 0)
423 self.assertEqual(len(controller.completed_jobs), 1)
424
425 @async_test
426 def test_controller_multi_jobs(self):
427 controller = upload.ImageUploadJobController(self._project)
428
429 with self.create_upload_task(self.account) as upload_task1:
430 with self.create_upload_task(self.account) as upload_task2:
431 job1_id = controller.create_job([upload_task1])
432 job2_id = controller.create_job([upload_task2])
433 self.assertEqual(len(controller.active_jobs), 2)
434 self.assertEqual(len(controller.completed_jobs), 0)
435
436 job1 = controller.get_job(job1_id)
437 job2 = controller.get_job(job2_id)
438
439 yield from asyncio.wait(
440 [job1.wait(), job2.wait()],
441 loop=self._loop
442 )
443
444 self.assertEqual(len(controller.active_jobs), 0)
445 self.assertEqual(len(controller.completed_jobs), 2)
446
447
448 class TestRateCalc(unittest.TestCase):
449 def test_no_smoothing(self):
450 calc = upload.ByteRateCalculator(1)
451 self.assertEqual(0, calc.rate)
452 calc.add_measurement(100, 1)
453 self.assertEqual(100, calc.rate)
454 calc.add_measurement(400, 2)
455 self.assertEqual(200, calc.rate)
456
457 def test_smoothing(self):
458 calc = upload.ByteRateCalculator(2)
459 calc.add_measurement(100, 1)
460 self.assertEqual(100, calc.rate)
461
462 calc.add_measurement(400, 2)
463 self.assertEqual(150, calc.rate)
464
465 calc.add_measurement(400, 2)
466 self.assertEqual(175, calc.rate)
467
468
469 class TestUploadProgress(unittest.TestCase):
470 def setUp(self):
471 self._loop = asyncio.get_event_loop()
472 self._log = logging.getLogger(__file__)
473
474 def test_write_proxy(self):
475 mem_hdl = io.BytesIO()
476 proxy = upload.UploadProgressWriteProxy(self._log, self._loop, 1000, mem_hdl)
477
478 data = b'test_bytes'
479
480 proxy.write(data)
481 self.assertEqual(data, mem_hdl.getvalue())
482 self.assertEqual(len(data), proxy.bytes_written)
483 self.assertEqual(1000, proxy.bytes_total)
484 self.assertEqual(1, proxy.progress_percent)
485
486 proxy.close()
487 self.assertTrue(mem_hdl.closed)
488
489
490 def main(argv=sys.argv[1:]):
491 logging.basicConfig(format='TEST %(message)s')
492
493 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
494 parser = argparse.ArgumentParser()
495 parser.add_argument('-v', '--verbose', action='store_true')
496 parser.add_argument('-n', '--no-runner', action='store_true')
497
498 args, unknown = parser.parse_known_args(argv)
499 if args.no_runner:
500 runner = None
501
502 # Set the global logging level
503 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
504
505 # The unittest framework requires a program name, so use the name of this
506 # file instead (we do not want to have to pass a fake program name to main
507 # when this is called from the interpreter).
508 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
509
510 if __name__ == '__main__':
511 main()