32a36a2bc1740b9704b39dafc01166b6429808fe
4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
33 from rift
.mano
import cloud
34 from rift
.tasklets
.rwimagemgr
import upload
35 from rift
.package
import checksums
36 from rift
.test
.dts
import async_test
37 from rift
.mano
.utils
.project
import ManoProject
, DEFAULT_PROJECT
41 gi
.require_version('RwCal', '1.0')
42 gi
.require_version('RwcalYang', '1.0')
43 gi
.require_version('RwCloudYang', '1.0')
44 gi
.require_version('RwLog', '1.0')
45 gi
.require_version('RwTypes', '1.0')
46 from gi
.repository
import (
54 rwstatus
= rw_status
.rwstatus_from_exc_map({
55 IndexError: RwTypes
.RwStatus
.NOTFOUND
,
56 KeyError: RwTypes
.RwStatus
.NOTFOUND
,
60 class CreateImageMock(object):
61 def __init__(self
, log
):
63 self
.image_name
= None
64 self
.image_checksum
= None
67 self
.do_read_slow
= False
71 def add_existing_image(self
, image_msg
):
72 self
._log
.debug("Appending existing image msg: %s", image_msg
)
73 self
._image
_msgs
.append(image_msg
)
76 def do_create_image(self
, _
, image
):
78 self
._log
.debug("Simulating failure")
79 raise ValueError("FAILED")
81 if not image
.has_field("fileno"):
82 raise ValueError("Image must have fileno")
84 self
.image_name
= image
.name
86 # Create a duplicate file descriptor to allow this code to own
87 # its own descritor (and thus close it) and allow the client to
88 # own and close its own descriptor.
89 new_fileno
= os
.dup(image
.fileno
)
90 with os
.fdopen(new_fileno
, 'rb') as hdl
:
91 bytes_hdl
= io
.BytesIO()
93 self
._log
.debug("slow reading from mock cal")
97 d
= os
.read(new_fileno
, 1024)
101 self
._log
.debug("read %s bytes", num_bytes
)
106 except Exception as e
:
107 self
._log
.warning("caught exception when reading: %s",
112 bytes_hdl
.write(hdl
.read())
115 self
.image_checksum
= checksums
.checksum(bytes_hdl
)
118 return str(uuid
.uuid4())
121 def do_get_image_list(self
, account
):
122 boxed_image_list
= RwcalYang
.VimResources()
123 for msg
in self
._image
_msgs
:
124 boxed_image_list
.imageinfo_list
.append(msg
)
126 return boxed_image_list
129 def create_random_image_file():
130 with
open("/dev/urandom", "rb") as rand_hdl
:
131 file_hdl
= tempfile
.NamedTemporaryFile("r+b")
132 file_hdl
.write(rand_hdl
.read(1 * 1024 * 1024))
138 def get_file_hdl_gen(file_hdl
):
141 d
= file_hdl
.read(64)
151 def get_image_checksum(image_hdl
):
152 image_checksum
= checksums
.checksum(image_hdl
)
154 return image_checksum
157 def create_image_info(image_name
, image_checksum
):
158 image
= RwcalYang
.ImageInfoItem()
159 image
.name
= image_name
160 image
.checksum
= image_checksum
161 image
.disk_format
= os
.path
.splitext(image_name
)[1][1:]
162 image
.container_format
= "bare"
167 class UploadTaskMixin(object):
168 def __init__(self
, log
, loop
):
172 def create_image_hdl(self
):
173 image_hdl
= create_random_image_file()
177 @contextlib.contextmanager
178 def create_upload_task(self
, account
, image_name
="test.qcow2",
179 image_checksum
=None, image_info
=None):
181 with self
.create_image_hdl() as image_hdl
:
183 image_checksum
= get_image_checksum(image_hdl
) \
184 if image_checksum
is None else image_checksum
186 image_info
= create_image_info(image_name
, image_checksum
) \
187 if image_info
is None else image_info
189 iter_hdl
= get_file_hdl_gen(image_hdl
)
190 pipe_gen
= upload
.GlanceImagePipeGen(self
._log
, self
._loop
, iter_hdl
)
192 upload_task
= upload
.AccountImageUploadTask(
193 self
._log
, self
._loop
, account
, image_info
, pipe_gen
.read_hdl
,
194 write_canceller
=pipe_gen
201 class ImageMockMixin(object):
202 ACCOUNT_MSG
= RwCloudYang
.CloudAcc(
207 def __init__(self
, log
):
209 self
._account
= cloud
.CloudAccount(
211 RwLog
.Ctx
.new(__file__
), ImageMockMixin
.ACCOUNT_MSG
214 self
._create
_image
_mock
= CreateImageMock(self
._log
)
216 # Mock the create_image call
217 self
._account
.cal
.create_image
= self
._create
_image
_mock
.do_create_image
218 self
._account
.cal
.get_image_list
= self
._create
_image
_mock
.do_get_image_list
225 def image_mock(self
):
226 return self
._create
_image
_mock
229 class TestImageUploadTask(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
230 def __init__(self
, *args
, **kwargs
):
231 self
._loop
= asyncio
.get_event_loop()
232 self
._log
= logging
.getLogger(__file__
)
234 ImageMockMixin
.__init
__(self
, self
._log
)
235 UploadTaskMixin
.__init
__(self
, self
._log
, self
._loop
)
236 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
239 def test_upload_image_task(self
):
240 with self
.create_upload_task(self
.account
) as upload_task
:
241 yield from upload_task
.start()
243 self
.assertIn("QUEUED", upload_task
.state_stack
)
244 self
.assertIn("CHECK_IMAGE_EXISTS", upload_task
.state_stack
)
245 self
.assertIn("UPLOADING", upload_task
.state_stack
)
246 self
.assertIn("COMPLETED", upload_task
.state_stack
)
248 self
.assertEqual("COMPLETED", upload_task
.state
)
250 self
.assertEqual(self
.image_mock
.image_name
, upload_task
.image_name
)
251 self
.assertEqual(self
.image_mock
.image_checksum
, upload_task
.image_checksum
)
253 task_pb_msg
= upload_task
.pb_msg
254 self
.assertEqual(upload_task
.image_name
, task_pb_msg
.image_name
)
257 @unittest.skip("Causes coredump in OSM")
259 def test_cancel_image_task(self
):
261 def wait_for_task_state(upload_task
, state
, timeout
=10):
262 start_time
= time
.time()
263 while (time
.time() - start_time
) < timeout
:
264 if upload_task
.state
== state
:
267 yield from asyncio
.sleep(.01)
269 raise asyncio
.TimeoutError()
271 self
.image_mock
.do_read_slow
= True
273 with self
.create_upload_task(self
.account
) as upload_task
:
275 yield from wait_for_task_state(upload_task
, "UPLOADING")
277 self
.assertEqual("CANCELLING", upload_task
.state
)
278 yield from wait_for_task_state(upload_task
, "CANCELLED")
281 def test_create_image_failed(self
):
282 self
.image_mock
.do_fail
= True
284 with self
.create_upload_task(self
.account
) as upload_task
:
285 yield from upload_task
.start()
287 self
.assertEqual("FAILED", upload_task
.state
)
290 def test_create_image_name_and_checksum_exists(self
):
291 with self
.create_upload_task(self
.account
) as upload_task
:
292 image_entry
= RwcalYang
.ImageInfoItem(
294 name
=upload_task
.image_name
,
295 checksum
=upload_task
.image_checksum
297 self
.image_mock
.add_existing_image(image_entry
)
299 yield from upload_task
.start()
301 # No image should have been uploaded, since the name and checksum
302 self
.assertEqual(self
.image_mock
.image_checksum
, None)
304 self
.assertEqual("COMPLETED", upload_task
.state
)
305 self
.assertTrue("UPLOADING" not in upload_task
.state_stack
)
308 class TestUploadJob(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
309 def __init__(self
, *args
, **kwargs
):
310 self
._loop
= asyncio
.get_event_loop()
311 self
._log
= logging
.getLogger(__file__
)
313 ImageMockMixin
.__init
__(self
, self
._log
)
314 UploadTaskMixin
.__init
__(self
, self
._log
, self
._loop
)
315 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
318 def test_single_task_upload_job(self
):
319 with self
.create_upload_task(self
.account
) as upload_task
:
320 job
= upload
.ImageUploadJob(self
._log
, self
._loop
, [upload_task
])
321 self
.assertEqual("QUEUED", job
.state
)
322 yield from job
.start()
324 self
.assertIn("QUEUED", job
.state_stack
)
325 self
.assertIn("IN_PROGRESS", job
.state_stack
)
326 self
.assertIn("COMPLETED", job
.state_stack
)
328 self
.assertEqual("COMPLETED", job
.state
)
330 job_pb_msg
= job
.pb_msg
331 self
.assertEqual("COMPLETED", job_pb_msg
.status
)
334 def test_multiple_tasks_upload_job(self
):
335 with self
.create_upload_task(self
.account
) as upload_task1
:
336 with self
.create_upload_task(self
.account
) as upload_task2
:
337 job
= upload
.ImageUploadJob(
338 self
._log
, self
._loop
, [upload_task1
, upload_task2
])
339 yield from job
.start()
341 self
.assertEqual("COMPLETED", job
.state
)
344 def test_failed_task_in_job(self
):
345 self
.image_mock
.do_fail
= True
347 with self
.create_upload_task(self
.account
) as upload_task
:
348 job
= upload
.ImageUploadJob(
349 self
._log
, self
._loop
, [upload_task
])
350 yield from job
.start()
352 self
.assertEqual("FAILED", job
.state
)
355 @unittest.skip("Causes coredump in OSM")
357 def test_cancel_job(self
):
359 def wait_for_job_state(upload_job
, state
, timeout
=10):
360 start_time
= time
.time()
361 while (time
.time() - start_time
) < timeout
:
362 if upload_job
.state
== state
:
365 yield from asyncio
.sleep(.01)
367 raise asyncio
.TimeoutError()
369 self
.image_mock
.do_read_slow
= True
371 with self
.create_upload_task(self
.account
) as upload_task
:
372 job
= upload
.ImageUploadJob(
373 self
._log
, self
._loop
, [upload_task
])
375 yield from wait_for_job_state(job
, "IN_PROGRESS")
377 self
.assertEqual("CANCELLING", job
.state
)
378 yield from wait_for_job_state(job
, "CANCELLED")
380 self
.assertEqual("CANCELLED", job
.state
)
383 class TestUploadJobController(unittest
.TestCase
, UploadTaskMixin
, ImageMockMixin
):
384 def __init__(self
, *args
, **kwargs
):
385 self
._loop
= asyncio
.get_event_loop()
386 self
._log
= logging
.getLogger(__file__
)
387 self
._project
= ManoProject(self
._log
, name
=DEFAULT_PROJECT
)
388 ImageMockMixin
.__init
__(self
, self
._log
)
389 unittest
.TestCase
.__init
__(self
, *args
, **kwargs
)
392 def test_controller_single_task_job(self
):
393 controller
= upload
.ImageUploadJobController(
394 self
._log
, self
._loop
, self
._project
,
397 with self
.create_upload_task(self
.account
) as upload_task
:
398 job_id
= controller
.create_job([upload_task
])
399 self
.assertEqual(len(controller
.active_jobs
), 1)
400 self
.assertEqual(len(controller
.completed_jobs
), 0)
402 job
= controller
.get_job(job_id
)
403 yield from job
.wait()
405 self
.assertEqual(len(controller
.active_jobs
), 0)
406 self
.assertEqual(len(controller
.completed_jobs
), 1)
408 upload_jobs_pb_msg
= controller
.pb_msg
409 self
.assertEqual(len(upload_jobs_pb_msg
.job
), 1)
412 def test_controller_multi_task_job(self
):
413 controller
= upload
.ImageUploadJobController(
414 self
._log
, self
._loop
, self
._project
417 with self
.create_upload_task(self
.account
) as upload_task1
:
418 with self
.create_upload_task(self
.account
) as upload_task2
:
419 job_id
= controller
.create_job([upload_task1
, upload_task2
])
420 self
.assertEqual(len(controller
.active_jobs
), 1)
421 self
.assertEqual(len(controller
.completed_jobs
), 0)
423 job
= controller
.get_job(job_id
)
424 yield from job
.wait()
425 self
.assertEqual(len(controller
.active_jobs
), 0)
426 self
.assertEqual(len(controller
.completed_jobs
), 1)
429 def test_controller_multi_jobs(self
):
430 controller
= upload
.ImageUploadJobController(
431 self
._log
, self
._loop
, self
._project
,
434 with self
.create_upload_task(self
.account
) as upload_task1
:
435 with self
.create_upload_task(self
.account
) as upload_task2
:
436 job1_id
= controller
.create_job([upload_task1
])
437 job2_id
= controller
.create_job([upload_task2
])
438 self
.assertEqual(len(controller
.active_jobs
), 2)
439 self
.assertEqual(len(controller
.completed_jobs
), 0)
441 job1
= controller
.get_job(job1_id
)
442 job2
= controller
.get_job(job2_id
)
444 yield from asyncio
.wait(
445 [job1
.wait(), job2
.wait()],
449 self
.assertEqual(len(controller
.active_jobs
), 0)
450 self
.assertEqual(len(controller
.completed_jobs
), 2)
453 class TestRateCalc(unittest
.TestCase
):
454 def test_no_smoothing(self
):
455 calc
= upload
.ByteRateCalculator(1)
456 self
.assertEqual(0, calc
.rate
)
457 calc
.add_measurement(100, 1)
458 self
.assertEqual(100, calc
.rate
)
459 calc
.add_measurement(400, 2)
460 self
.assertEqual(200, calc
.rate
)
462 def test_smoothing(self
):
463 calc
= upload
.ByteRateCalculator(2)
464 calc
.add_measurement(100, 1)
465 self
.assertEqual(100, calc
.rate
)
467 calc
.add_measurement(400, 2)
468 self
.assertEqual(150, calc
.rate
)
470 calc
.add_measurement(400, 2)
471 self
.assertEqual(175, calc
.rate
)
474 class TestUploadProgress(unittest
.TestCase
):
476 self
._loop
= asyncio
.get_event_loop()
477 self
._log
= logging
.getLogger(__file__
)
479 def test_write_proxy(self
):
480 mem_hdl
= io
.BytesIO()
481 proxy
= upload
.UploadProgressWriteProxy(self
._log
, self
._loop
, 1000, mem_hdl
)
486 self
.assertEqual(data
, mem_hdl
.getvalue())
487 self
.assertEqual(len(data
), proxy
.bytes_written
)
488 self
.assertEqual(1000, proxy
.bytes_total
)
489 self
.assertEqual(1, proxy
.progress_percent
)
492 self
.assertTrue(mem_hdl
.closed
)
495 def main(argv
=sys
.argv
[1:]):
496 logging
.basicConfig(format
='TEST %(message)s')
498 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
499 parser
= argparse
.ArgumentParser()
500 parser
.add_argument('-v', '--verbose', action
='store_true')
501 parser
.add_argument('-n', '--no-runner', action
='store_true')
503 args
, unknown
= parser
.parse_known_args(argv
)
507 # Set the global logging level
508 logging
.getLogger().setLevel(logging
.DEBUG
if args
.verbose
else logging
.ERROR
)
510 # The unittest framework requires a program name, so use the name of this
511 # file instead (we do not want to have to pass a fake program name to main
512 # when this is called from the interpreter).
513 unittest
.main(argv
=[__file__
] + unknown
+ ["-v"], testRunner
=runner
)
515 if __name__
== '__main__':