9d4464ff065825841a98208a06857d70c4b82c8e
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / test / utest_image_upload.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19
20 import argparse
21 import asyncio
22 import contextlib
23 import io
24 import logging
25 import os
26 import sys
27 import tempfile
28 import time
29 import unittest
30 import uuid
31 import xmlrunner
32
33 from rift.mano import cloud
34 from rift.tasklets.rwimagemgr import upload
35 from rift.package import checksums
36 from rift.test.dts import async_test
37 import rw_status
38
39 import gi
40 gi.require_version('RwCal', '1.0')
41 gi.require_version('RwcalYang', '1.0')
42 gi.require_version('RwCloudYang', '1.0')
43 gi.require_version('RwLog', '1.0')
44 gi.require_version('RwTypes', '1.0')
45 from gi.repository import (
46 RwCal,
47 RwCloudYang,
48 RwLog,
49 RwTypes,
50 RwcalYang,
51 )
52
53 rwstatus = rw_status.rwstatus_from_exc_map({
54 IndexError: RwTypes.RwStatus.NOTFOUND,
55 KeyError: RwTypes.RwStatus.NOTFOUND,
56 })
57
58
59 class CreateImageMock(object):
60 def __init__(self, log):
61 self._log = log
62 self.image_name = None
63 self.image_checksum = None
64
65 self.do_fail = False
66 self.do_read_slow = False
67
68 self._image_msgs = []
69
70 def add_existing_image(self, image_msg):
71 self._log.debug("Appending existing image msg: %s", image_msg)
72 self._image_msgs.append(image_msg)
73
74 @rwstatus
75 def do_create_image(self, _, image):
76 if self.do_fail:
77 self._log.debug("Simulating failure")
78 raise ValueError("FAILED")
79
80 if not image.has_field("fileno"):
81 raise ValueError("Image must have fileno")
82
83 self.image_name = image.name
84
85 # Create a duplicate file descriptor to allow this code to own
86 # its own descritor (and thus close it) and allow the client to
87 # own and close its own descriptor.
88 new_fileno = os.dup(image.fileno)
89 with os.fdopen(new_fileno, 'rb') as hdl:
90 bytes_hdl = io.BytesIO()
91 if self.do_read_slow:
92 self._log.debug("slow reading from mock cal")
93 try:
94 num_bytes = 0
95 while True:
96 d = os.read(new_fileno, 1024)
97 num_bytes += len(d)
98 bytes_hdl.write(d)
99 if not d:
100 self._log.debug("read %s bytes", num_bytes)
101 return
102
103 time.sleep(.05)
104
105 except Exception as e:
106 self._log.warning("caught exception when reading: %s",
107 str(e))
108 raise
109
110 else:
111 bytes_hdl.write(hdl.read())
112
113 bytes_hdl.seek(0)
114 self.image_checksum = checksums.checksum(bytes_hdl)
115 bytes_hdl.close()
116
117 return str(uuid.uuid4())
118
119 @rwstatus
120 def do_get_image_list(self, account):
121 boxed_image_list = RwcalYang.VimResources()
122 for msg in self._image_msgs:
123 boxed_image_list.imageinfo_list.append(msg)
124
125 return boxed_image_list
126
127
128 def create_random_image_file():
129 with open("/dev/urandom", "rb") as rand_hdl:
130 file_hdl = tempfile.NamedTemporaryFile("r+b")
131 file_hdl.write(rand_hdl.read(1 * 1024 * 1024))
132 file_hdl.flush()
133 file_hdl.seek(0)
134 return file_hdl
135
136
137 def get_file_hdl_gen(file_hdl):
138 while True:
139 try:
140 d = file_hdl.read(64)
141 except ValueError:
142 return
143
144 if not d:
145 return
146
147 yield d
148
149
150 def get_image_checksum(image_hdl):
151 image_checksum = checksums.checksum(image_hdl)
152 image_hdl.seek(0)
153 return image_checksum
154
155
156 def create_image_info(image_name, image_checksum):
157 image = RwcalYang.ImageInfoItem()
158 image.name = image_name
159 image.checksum = image_checksum
160 image.disk_format = os.path.splitext(image_name)[1][1:]
161 image.container_format = "bare"
162
163 return image
164
165
166 class UploadTaskMixin(object):
167 def __init__(self, log, loop):
168 self._log = log
169 self._loop = loop
170
171 def create_image_hdl(self):
172 image_hdl = create_random_image_file()
173
174 return image_hdl
175
176 @contextlib.contextmanager
177 def create_upload_task(self, account, image_name="test.qcow2",
178 image_checksum=None, image_info=None):
179
180 with self.create_image_hdl() as image_hdl:
181
182 image_checksum = get_image_checksum(image_hdl) \
183 if image_checksum is None else image_checksum
184
185 image_info = create_image_info(image_name, image_checksum) \
186 if image_info is None else image_info
187
188 iter_hdl = get_file_hdl_gen(image_hdl)
189 pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, iter_hdl)
190
191 upload_task = upload.AccountImageUploadTask(
192 self._log, self._loop, account, image_info, pipe_gen.read_hdl,
193 write_canceller=pipe_gen
194 )
195 pipe_gen.start()
196
197 yield upload_task
198
199
200 class ImageMockMixin(object):
201 ACCOUNT_MSG = RwCloudYang.CloudAccount(
202 name="mock",
203 account_type="mock",
204 )
205
206 def __init__(self, log):
207 self._log = log
208 self._account = cloud.CloudAccount(
209 self._log,
210 RwLog.Ctx.new(__file__), ImageMockMixin.ACCOUNT_MSG
211 )
212
213 self._create_image_mock = CreateImageMock(self._log)
214
215 # Mock the create_image call
216 self._account.cal.create_image = self._create_image_mock.do_create_image
217 self._account.cal.get_image_list = self._create_image_mock.do_get_image_list
218
219 @property
220 def account(self):
221 return self._account
222
223 @property
224 def image_mock(self):
225 return self._create_image_mock
226
227
228 class TestImageUploadTask(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
229 def __init__(self, *args, **kwargs):
230 self._loop = asyncio.get_event_loop()
231 self._log = logging.getLogger(__file__)
232
233 ImageMockMixin.__init__(self, self._log)
234 UploadTaskMixin.__init__(self, self._log, self._loop)
235 unittest.TestCase.__init__(self, *args, **kwargs)
236
237 @async_test
238 def test_upload_image_task(self):
239 with self.create_upload_task(self.account) as upload_task:
240 yield from upload_task.start()
241
242 self.assertIn("QUEUED", upload_task.state_stack)
243 self.assertIn("CHECK_IMAGE_EXISTS", upload_task.state_stack)
244 self.assertIn("UPLOADING", upload_task.state_stack)
245 self.assertIn("COMPLETED", upload_task.state_stack)
246
247 self.assertEqual("COMPLETED", upload_task.state)
248
249 self.assertEqual(self.image_mock.image_name, upload_task.image_name)
250 self.assertEqual(self.image_mock.image_checksum, upload_task.image_checksum)
251
252 task_pb_msg = upload_task.pb_msg
253 self.assertEqual(upload_task.image_name, task_pb_msg.image_name)
254
255 @async_test
256 def test_cancel_image_task(self):
257 @asyncio.coroutine
258 def wait_for_task_state(upload_task, state, timeout=10):
259 start_time = time.time()
260 while (time.time() - start_time) < timeout:
261 if upload_task.state == state:
262 return
263
264 yield from asyncio.sleep(.01)
265
266 raise asyncio.TimeoutError()
267
268 self.image_mock.do_read_slow = True
269
270 with self.create_upload_task(self.account) as upload_task:
271 upload_task.start()
272 yield from wait_for_task_state(upload_task, "UPLOADING")
273 upload_task.stop()
274 self.assertEqual("CANCELLING", upload_task.state)
275 yield from wait_for_task_state(upload_task, "CANCELLED")
276
277 @async_test
278 def test_create_image_failed(self):
279 self.image_mock.do_fail = True
280
281 with self.create_upload_task(self.account) as upload_task:
282 yield from upload_task.start()
283
284 self.assertEqual("FAILED", upload_task.state)
285
286 @async_test
287 def test_create_image_name_and_checksum_exists(self):
288 with self.create_upload_task(self.account) as upload_task:
289 image_entry = RwcalYang.ImageInfoItem(
290 id="asdf",
291 name=upload_task.image_name,
292 checksum=upload_task.image_checksum
293 )
294 self.image_mock.add_existing_image(image_entry)
295
296 yield from upload_task.start()
297
298 # No image should have been uploaded, since the name and checksum
299 self.assertEqual(self.image_mock.image_checksum, None)
300
301 self.assertEqual("COMPLETED", upload_task.state)
302 self.assertTrue("UPLOADING" not in upload_task.state_stack)
303
304
305 class TestUploadJob(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
306 def __init__(self, *args, **kwargs):
307 self._loop = asyncio.get_event_loop()
308 self._log = logging.getLogger(__file__)
309
310 ImageMockMixin.__init__(self, self._log)
311 UploadTaskMixin.__init__(self, self._log, self._loop)
312 unittest.TestCase.__init__(self, *args, **kwargs)
313
314 @async_test
315 def test_single_task_upload_job(self):
316 with self.create_upload_task(self.account) as upload_task:
317 job = upload.ImageUploadJob(self._log, self._loop, [upload_task])
318 self.assertEqual("QUEUED", job.state)
319 yield from job.start()
320
321 self.assertIn("QUEUED", job.state_stack)
322 self.assertIn("IN_PROGRESS", job.state_stack)
323 self.assertIn("COMPLETED", job.state_stack)
324
325 self.assertEqual("COMPLETED", job.state)
326
327 job_pb_msg = job.pb_msg
328 self.assertEqual("COMPLETED", job_pb_msg.status)
329
330 @async_test
331 def test_multiple_tasks_upload_job(self):
332 with self.create_upload_task(self.account) as upload_task1:
333 with self.create_upload_task(self.account) as upload_task2:
334 job = upload.ImageUploadJob(
335 self._log, self._loop, [upload_task1, upload_task2])
336 yield from job.start()
337
338 self.assertEqual("COMPLETED", job.state)
339
340 @async_test
341 def test_failed_task_in_job(self):
342 self.image_mock.do_fail = True
343
344 with self.create_upload_task(self.account) as upload_task:
345 job = upload.ImageUploadJob(
346 self._log, self._loop, [upload_task])
347 yield from job.start()
348
349 self.assertEqual("FAILED", job.state)
350
351 @async_test
352 def test_cancel_job(self):
353 @asyncio.coroutine
354 def wait_for_job_state(upload_job, state, timeout=10):
355 start_time = time.time()
356 while (time.time() - start_time) < timeout:
357 if upload_job.state == state:
358 return
359
360 yield from asyncio.sleep(.01)
361
362 raise asyncio.TimeoutError()
363
364 self.image_mock.do_read_slow = True
365
366 with self.create_upload_task(self.account) as upload_task:
367 job = upload.ImageUploadJob(
368 self._log, self._loop, [upload_task])
369 job.start()
370 yield from wait_for_job_state(job, "IN_PROGRESS")
371 job.stop()
372 self.assertEqual("CANCELLING", job.state)
373 yield from wait_for_job_state(job, "CANCELLED")
374
375 self.assertEqual("CANCELLED", job.state)
376
377
378 class TestUploadJobController(unittest.TestCase, UploadTaskMixin, ImageMockMixin):
379 def __init__(self, *args, **kwargs):
380 self._loop = asyncio.get_event_loop()
381 self._log = logging.getLogger(__file__)
382
383 ImageMockMixin.__init__(self, self._log)
384 unittest.TestCase.__init__(self, *args, **kwargs)
385
386 @async_test
387 def test_controller_single_task_job(self):
388 controller = upload.ImageUploadJobController(
389 self._log, self._loop
390 )
391
392 with self.create_upload_task(self.account) as upload_task:
393 job_id = controller.create_job([upload_task])
394 self.assertEqual(len(controller.active_jobs), 1)
395 self.assertEqual(len(controller.completed_jobs), 0)
396
397 job = controller.get_job(job_id)
398 yield from job.wait()
399
400 self.assertEqual(len(controller.active_jobs), 0)
401 self.assertEqual(len(controller.completed_jobs), 1)
402
403 upload_jobs_pb_msg = controller.pb_msg
404 self.assertEqual(len(upload_jobs_pb_msg.job), 1)
405
406 @async_test
407 def test_controller_multi_task_job(self):
408 controller = upload.ImageUploadJobController(
409 self._log, self._loop
410 )
411
412 with self.create_upload_task(self.account) as upload_task1:
413 with self.create_upload_task(self.account) as upload_task2:
414 job_id = controller.create_job([upload_task1, upload_task2])
415 self.assertEqual(len(controller.active_jobs), 1)
416 self.assertEqual(len(controller.completed_jobs), 0)
417
418 job = controller.get_job(job_id)
419 yield from job.wait()
420 self.assertEqual(len(controller.active_jobs), 0)
421 self.assertEqual(len(controller.completed_jobs), 1)
422
423 @async_test
424 def test_controller_multi_jobs(self):
425 controller = upload.ImageUploadJobController(
426 self._log, self._loop
427 )
428
429 with self.create_upload_task(self.account) as upload_task1:
430 with self.create_upload_task(self.account) as upload_task2:
431 job1_id = controller.create_job([upload_task1])
432 job2_id = controller.create_job([upload_task2])
433 self.assertEqual(len(controller.active_jobs), 2)
434 self.assertEqual(len(controller.completed_jobs), 0)
435
436 job1 = controller.get_job(job1_id)
437 job2 = controller.get_job(job2_id)
438
439 yield from asyncio.wait(
440 [job1.wait(), job2.wait()],
441 loop=self._loop
442 )
443
444 self.assertEqual(len(controller.active_jobs), 0)
445 self.assertEqual(len(controller.completed_jobs), 2)
446
447
448 class TestRateCalc(unittest.TestCase):
449 def test_no_smoothing(self):
450 calc = upload.ByteRateCalculator(1)
451 self.assertEqual(0, calc.rate)
452 calc.add_measurement(100, 1)
453 self.assertEqual(100, calc.rate)
454 calc.add_measurement(400, 2)
455 self.assertEqual(200, calc.rate)
456
457 def test_smoothing(self):
458 calc = upload.ByteRateCalculator(2)
459 calc.add_measurement(100, 1)
460 self.assertEqual(100, calc.rate)
461
462 calc.add_measurement(400, 2)
463 self.assertEqual(150, calc.rate)
464
465 calc.add_measurement(400, 2)
466 self.assertEqual(175, calc.rate)
467
468
469 class TestUploadProgress(unittest.TestCase):
470 def setUp(self):
471 self._loop = asyncio.get_event_loop()
472 self._log = logging.getLogger(__file__)
473
474 def test_write_proxy(self):
475 mem_hdl = io.BytesIO()
476 proxy = upload.UploadProgressWriteProxy(self._log, self._loop, 1000, mem_hdl)
477
478 data = b'test_bytes'
479
480 proxy.write(data)
481 self.assertEqual(data, mem_hdl.getvalue())
482 self.assertEqual(len(data), proxy.bytes_written)
483 self.assertEqual(1000, proxy.bytes_total)
484 self.assertEqual(1, proxy.progress_percent)
485
486 proxy.close()
487 self.assertTrue(mem_hdl.closed)
488
489
490 def main(argv=sys.argv[1:]):
491 logging.basicConfig(format='TEST %(message)s')
492
493 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
494 parser = argparse.ArgumentParser()
495 parser.add_argument('-v', '--verbose', action='store_true')
496 parser.add_argument('-n', '--no-runner', action='store_true')
497
498 args, unknown = parser.parse_known_args(argv)
499 if args.no_runner:
500 runner = None
501
502 # Set the global logging level
503 logging.getLogger().setLevel(logging.DEBUG if args.verbose else logging.ERROR)
504
505 # The unittest framework requires a program name, so use the name of this
506 # file instead (we do not want to have to pass a fake program name to main
507 # when this is called from the interpreter).
508 unittest.main(argv=[__file__] + unknown + ["-v"], testRunner=runner)
509
510 if __name__ == '__main__':
511 main()