4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
33 from rift
.mano
import cloud
34 from rift
.tasklets
.rwimagemgr
import upload
35 from rift
.package
import checksums
36 from rift
.test
.dts
import async_test
40 gi
.require_version('RwCal', '1.0')
41 gi
.require_version('RwcalYang', '1.0')
42 gi
.require_version('RwCloudYang', '1.0')
43 gi
.require_version('RwLog', '1.0')
44 gi
.require_version('RwTypes', '1.0')
45 from gi
.repository
import (
53 rwstatus
= rw_status
.rwstatus_from_exc_map({
54 IndexError: RwTypes
.RwStatus
.NOTFOUND
,
55 KeyError: RwTypes
.RwStatus
.NOTFOUND
,
59 class CreateImageMock(object):
60 def __init__(self
, log
):
62 self
.image_name
= None
63 self
.image_checksum
= None
66 self
.do_read_slow
= False
70 def add_existing_image(self
, image_msg
):
71 self
._log
.debug("Appending existing image msg: %s", image_msg
)
72 self
._image
_msgs
.append(image_msg
)
75 def do_create_image(self
, _
, image
):
77 self
._log
.debug("Simulating failure")
78 raise ValueError("FAILED")
80 if not image
.has_field("fileno"):
81 raise ValueError("Image must have fileno")
83 self
.image_name
= image
.name
85 # Create a duplicate file descriptor to allow this code to own
86 # its own descritor (and thus close it) and allow the client to
87 # own and close its own descriptor.
88 new_fileno
= os
.dup(image
.fileno
)
89 with os
.fdopen(new_fileno
, 'rb') as hdl
:
90 bytes_hdl
= io
.BytesIO()
92 self
._log
.debug("slow reading from mock cal")
96 d
= os
.read(new_fileno
, 1024)
100 self
._log
.debug("read %s bytes", num_bytes
)
105 except Exception as e
:
106 self
._log
.warning("caught exception when reading: %s",
111 bytes_hdl
.write(hdl
.read())
114 self
.image_checksum
= checksums
.checksum(bytes_hdl
)
117 return str(uuid
.uuid4())
120 def do_get_image_list(self
, account
):
121 boxed_image_list
= RwcalYang
.VimResources()
122 for msg
in self
._image
_msgs
:
123 boxed_image_list
.imageinfo_list
.append(msg
)
125 return boxed_image_list
128 def create_random_image_file():
129 with
open("/dev/urandom", "rb") as rand_hdl
:
130 file_hdl
= tempfile
.NamedTemporaryFile("r+b")
131 file_hdl
.write(rand_hdl
.read(1 * 1024 * 1024))
137 def get_file_hdl_gen(file_hdl
):
140 d
= file_hdl
.read(64)
150 def get_image_checksum(image_hdl
):
151 image_checksum
= checksums
.checksum(image_hdl
)
153 return image_checksum
156 def create_image_info(image_name
, image_checksum
):
157 image
= RwcalYang
.ImageInfoItem()
158 image
.name
= image_name
159 image
.checksum
= image_checksum
160 image
.disk_format
= os
.path
.splitext(image_name
)[1][1:]
161 image
.container_format
= "bare"
166 class UploadTaskMixin(object):
167 def __init__(self
, log
, loop
):
171 def create_image_hdl(self
):
172 image_hdl
= create_random_image_file()
176 @contextlib.contextmanager
177 def create_upload_task(self
, account
, image_name
="test.qcow2",
178 image_checksum
=None, image_info
=None):
180 with self
.create_image_hdl() as image_hdl
:
182 image_checksum
= get_image_checksum(image_hdl
) \
183 if image_checksum
is None else image_checksum
185 image_info
= create_image_info(image_name
, image_checksum
) \
186 if image_info
is None else image_info
188 iter_hdl
= get_file_hdl_gen(image_hdl
)
189 pipe_gen
= upload
.GlanceImagePipeGen(self
._log
, self
._loop
, iter_hdl
)
191 upload_task
= upload
.AccountImageUploadTask(
192 self
._log
, self
._loop
, account
, image_info
, pipe_gen
.read_hdl
,
193 write_canceller
=pipe_gen
200 class ImageMockMixin(object):
201 ACCOUNT_MSG
= RwCloudYang
.CloudAccount(
206 def __init__(self
, log
):
208 self
._account
= cloud
.CloudAccount(
210 RwLog
.Ctx
.new(__file__
), ImageMockMixin
.ACCOUNT_MSG
213 self
._create
_image
_mock
= CreateImageMock(self
._log
)
215 # Mock the create_image call
216 self
._account
.cal
.create_image
= self
._create
_image
_mock
.do_create_image
217 self
._account
.cal
.get_image_list
= self
._create
_image
_mock
.do_get_image_list
224 def image_mock(self
):
225 return self
._create
_image
_mock
228 class TestImageUploadTask(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
229 def __init__(self
, *args
, **kwargs
):
230 self
._loop
= asyncio
.get_event_loop()
231 self
._log
= logging
.getLogger(__file__
)
233 ImageMockMixin
.__init
__(self
, self
._log
)
234 UploadTaskMixin
.__init
__(self
, self
._log
, self
._loop
)
235 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
238 def test_upload_image_task(self
):
239 with self
.create_upload_task(self
.account
) as upload_task
:
240 yield from upload_task
.start()
242 self
.assertIn("QUEUED", upload_task
.state_stack
)
243 self
.assertIn("CHECK_IMAGE_EXISTS", upload_task
.state_stack
)
244 self
.assertIn("UPLOADING", upload_task
.state_stack
)
245 self
.assertIn("COMPLETED", upload_task
.state_stack
)
247 self
.assertEqual("COMPLETED", upload_task
.state
)
249 self
.assertEqual(self
.image_mock
.image_name
, upload_task
.image_name
)
250 self
.assertEqual(self
.image_mock
.image_checksum
, upload_task
.image_checksum
)
252 task_pb_msg
= upload_task
.pb_msg
253 self
.assertEqual(upload_task
.image_name
, task_pb_msg
.image_name
)
256 def test_cancel_image_task(self
):
258 def wait_for_task_state(upload_task
, state
, timeout
=10):
259 start_time
= time
.time()
260 while (time
.time() - start_time
) < timeout
:
261 if upload_task
.state
== state
:
264 yield from asyncio
.sleep(.01)
266 raise asyncio
.TimeoutError()
268 self
.image_mock
.do_read_slow
= True
270 with self
.create_upload_task(self
.account
) as upload_task
:
272 yield from wait_for_task_state(upload_task
, "UPLOADING")
274 self
.assertEqual("CANCELLING", upload_task
.state
)
275 yield from wait_for_task_state(upload_task
, "CANCELLED")
278 def test_create_image_failed(self
):
279 self
.image_mock
.do_fail
= True
281 with self
.create_upload_task(self
.account
) as upload_task
:
282 yield from upload_task
.start()
284 self
.assertEqual("FAILED", upload_task
.state
)
287 def test_create_image_name_and_checksum_exists(self
):
288 with self
.create_upload_task(self
.account
) as upload_task
:
289 image_entry
= RwcalYang
.ImageInfoItem(
291 name
=upload_task
.image_name
,
292 checksum
=upload_task
.image_checksum
294 self
.image_mock
.add_existing_image(image_entry
)
296 yield from upload_task
.start()
298 # No image should have been uploaded, since the name and checksum
299 self
.assertEqual(self
.image_mock
.image_checksum
, None)
301 self
.assertEqual("COMPLETED", upload_task
.state
)
302 self
.assertTrue("UPLOADING" not in upload_task
.state_stack
)
305 class TestUploadJob(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
306 def __init__(self
, *args
, **kwargs
):
307 self
._loop
= asyncio
.get_event_loop()
308 self
._log
= logging
.getLogger(__file__
)
310 ImageMockMixin
.__init
__(self
, self
._log
)
311 UploadTaskMixin
.__init
__(self
, self
._log
, self
._loop
)
312 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
315 def test_single_task_upload_job(self
):
316 with self
.create_upload_task(self
.account
) as upload_task
:
317 job
= upload
.ImageUploadJob(self
._log
, self
._loop
, [upload_task
])
318 self
.assertEqual("QUEUED", job
.state
)
319 yield from job
.start()
321 self
.assertIn("QUEUED", job
.state_stack
)
322 self
.assertIn("IN_PROGRESS", job
.state_stack
)
323 self
.assertIn("COMPLETED", job
.state_stack
)
325 self
.assertEqual("COMPLETED", job
.state
)
327 job_pb_msg
= job
.pb_msg
328 self
.assertEqual("COMPLETED", job_pb_msg
.status
)
331 def test_multiple_tasks_upload_job(self
):
332 with self
.create_upload_task(self
.account
) as upload_task1
:
333 with self
.create_upload_task(self
.account
) as upload_task2
:
334 job
= upload
.ImageUploadJob(
335 self
._log
, self
._loop
, [upload_task1
, upload_task2
])
336 yield from job
.start()
338 self
.assertEqual("COMPLETED", job
.state
)
341 def test_failed_task_in_job(self
):
342 self
.image_mock
.do_fail
= True
344 with self
.create_upload_task(self
.account
) as upload_task
:
345 job
= upload
.ImageUploadJob(
346 self
._log
, self
._loop
, [upload_task
])
347 yield from job
.start()
349 self
.assertEqual("FAILED", job
.state
)
352 def test_cancel_job(self
):
354 def wait_for_job_state(upload_job
, state
, timeout
=10):
355 start_time
= time
.time()
356 while (time
.time() - start_time
) < timeout
:
357 if upload_job
.state
== state
:
360 yield from asyncio
.sleep(.01)
362 raise asyncio
.TimeoutError()
364 self
.image_mock
.do_read_slow
= True
366 with self
.create_upload_task(self
.account
) as upload_task
:
367 job
= upload
.ImageUploadJob(
368 self
._log
, self
._loop
, [upload_task
])
370 yield from wait_for_job_state(job
, "IN_PROGRESS")
372 self
.assertEqual("CANCELLING", job
.state
)
373 yield from wait_for_job_state(job
, "CANCELLED")
375 self
.assertEqual("CANCELLED", job
.state
)
378 class TestUploadJobController(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
379 def __init__(self
, *args
, **kwargs
):
380 self
._loop
= asyncio
.get_event_loop()
381 self
._log
= logging
.getLogger(__file__
)
383 ImageMockMixin
.__init
__(self
, self
._log
)
384 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
387 def test_controller_single_task_job(self
):
388 controller
= upload
.ImageUploadJobController(
389 self
._log
, self
._loop
392 with self
.create_upload_task(self
.account
) as upload_task
:
393 job_id
= controller
.create_job([upload_task
])
394 self
.assertEqual(len(controller
.active_jobs
), 1)
395 self
.assertEqual(len(controller
.completed_jobs
), 0)
397 job
= controller
.get_job(job_id
)
398 yield from job
.wait()
400 self
.assertEqual(len(controller
.active_jobs
), 0)
401 self
.assertEqual(len(controller
.completed_jobs
), 1)
403 upload_jobs_pb_msg
= controller
.pb_msg
404 self
.assertEqual(len(upload_jobs_pb_msg
.job
), 1)
407 def test_controller_multi_task_job(self
):
408 controller
= upload
.ImageUploadJobController(
409 self
._log
, self
._loop
412 with self
.create_upload_task(self
.account
) as upload_task1
:
413 with self
.create_upload_task(self
.account
) as upload_task2
:
414 job_id
= controller
.create_job([upload_task1
, upload_task2
])
415 self
.assertEqual(len(controller
.active_jobs
), 1)
416 self
.assertEqual(len(controller
.completed_jobs
), 0)
418 job
= controller
.get_job(job_id
)
419 yield from job
.wait()
420 self
.assertEqual(len(controller
.active_jobs
), 0)
421 self
.assertEqual(len(controller
.completed_jobs
), 1)
424 def test_controller_multi_jobs(self
):
425 controller
= upload
.ImageUploadJobController(
426 self
._log
, self
._loop
429 with self
.create_upload_task(self
.account
) as upload_task1
:
430 with self
.create_upload_task(self
.account
) as upload_task2
:
431 job1_id
= controller
.create_job([upload_task1
])
432 job2_id
= controller
.create_job([upload_task2
])
433 self
.assertEqual(len(controller
.active_jobs
), 2)
434 self
.assertEqual(len(controller
.completed_jobs
), 0)
436 job1
= controller
.get_job(job1_id
)
437 job2
= controller
.get_job(job2_id
)
439 yield from asyncio
.wait(
440 [job1
.wait(), job2
.wait()],
444 self
.assertEqual(len(controller
.active_jobs
), 0)
445 self
.assertEqual(len(controller
.completed_jobs
), 2)
448 class TestRateCalc(unittest
.TestCase
):
449 def test_no_smoothing(self
):
450 calc
= upload
.ByteRateCalculator(1)
451 self
.assertEqual(0, calc
.rate
)
452 calc
.add_measurement(100, 1)
453 self
.assertEqual(100, calc
.rate
)
454 calc
.add_measurement(400, 2)
455 self
.assertEqual(200, calc
.rate
)
457 def test_smoothing(self
):
458 calc
= upload
.ByteRateCalculator(2)
459 calc
.add_measurement(100, 1)
460 self
.assertEqual(100, calc
.rate
)
462 calc
.add_measurement(400, 2)
463 self
.assertEqual(150, calc
.rate
)
465 calc
.add_measurement(400, 2)
466 self
.assertEqual(175, calc
.rate
)
469 class TestUploadProgress(unittest
.TestCase
):
471 self
._loop
= asyncio
.get_event_loop()
472 self
._log
= logging
.getLogger(__file__
)
474 def test_write_proxy(self
):
475 mem_hdl
= io
.BytesIO()
476 proxy
= upload
.UploadProgressWriteProxy(self
._log
, self
._loop
, 1000, mem_hdl
)
481 self
.assertEqual(data
, mem_hdl
.getvalue())
482 self
.assertEqual(len(data
), proxy
.bytes_written
)
483 self
.assertEqual(1000, proxy
.bytes_total
)
484 self
.assertEqual(1, proxy
.progress_percent
)
487 self
.assertTrue(mem_hdl
.closed
)
490 def main(argv
=sys
.argv
[1:]):
491 logging
.basicConfig(format
='TEST %(message)s')
493 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
494 parser
= argparse
.ArgumentParser()
495 parser
.add_argument('-v', '--verbose', action
='store_true')
496 parser
.add_argument('-n', '--no-runner', action
='store_true')
498 args
, unknown
= parser
.parse_known_args(argv
)
502 # Set the global logging level
503 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
505 # The unittest framework requires a program name, so use the name of this
506 # file instead (we do not want to have to pass a fake program name to main
507 # when this is called from the interpreter).
508 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
510 if __name__
== '__main__':