3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 import rift
.mano
.cloud
28 gi
.require_version('RwImageMgmtYang', '1.0')
29 from gi
.repository
import (
34 class UploadJobError(Exception):
38 class ImageUploadTaskError(Exception):
42 class ImageUploadError(ImageUploadTaskError
):
46 class ImageListError(ImageUploadTaskError
):
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS
= 20
54 def __init__(self
, log
, loop
, max_completed_jobs
=MAX_COMPLETED_JOBS
):
57 self
._job
_id
_gen
= itertools
.count(1)
58 self
._max
_completed
_jobs
= max_completed_jobs
61 self
._completed
_jobs
= collections
.deque(
62 maxlen
=self
._max
_completed
_jobs
67 """ the UploadJobs protobuf message """
68 upload_jobs_msg
= RwImageMgmtYang
.UploadJobs()
69 for job
in self
._jobs
.values():
70 upload_jobs_msg
.job
.append(job
.pb_msg
)
72 return upload_jobs_msg
76 """ the tracked list of ImageUploadJobs """
77 return self
._jobs
.values()
80 def completed_jobs(self
):
81 """ completed jobs in the tracked list of ImageUploadJobs """
82 return [job
for job
in self
._jobs
.values() if job
in self
._completed
_jobs
]
85 def active_jobs(self
):
86 """ in-progress jobs in the tracked list of ImageUploadJobs """
87 return [job
for job
in self
._jobs
.values() if job
not in self
._completed
_jobs
]
89 def _add_job(self
, job
):
90 self
._jobs
[job
.id] = job
92 def _start_job(self
, job
, on_completed
=None):
93 def on_job_completed(_
):
94 self
._log
.debug("%s completed. Adding to completed jobs list.", job
)
96 # If adding a new completed job is going to overflow the
97 # completed job list, find the first job that completed and
98 # remove it from the tracked jobs.
99 if len(self
._completed
_jobs
) == self
._completed
_jobs
.maxlen
:
100 first_completed_job
= self
._completed
_jobs
[-1]
101 del self
._jobs
[first_completed_job
.id]
103 self
._completed
_jobs
.appendleft(job
)
105 job_future
= job
.start()
106 job_future
.add_done_callback(on_job_completed
)
108 if on_completed
is not None:
109 job_future
.add_done_callback(on_completed
)
111 def get_job(self
, job_id
):
112 """ Get the UploadJob from the job id
115 job_id - the job id that was previously added to the controller
118 The associated ImageUploadJob
121 LookupError - Could not find the job id
123 if job_id
not in self
._jobs
:
124 raise LookupError("Could not find job_id %s" % job_id
)
126 return self
._jobs
[job_id
]
128 def create_job(self
, image_tasks
, on_completed
=None):
129 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
132 image_tasks - a list of ImageUploadTasks
133 on_completed - a callback which is added to the job future
138 self
._log
.debug("Creating new job from %s image tasks", len(image_tasks
))
139 new_job
= ImageUploadJob(
143 job_id
=next(self
._job
_id
_gen
)
146 self
._add
_job
(new_job
)
147 self
._start
_job
(new_job
, on_completed
=on_completed
)
152 class ImageUploadJob(object):
153 """ This class manages a set of ImageUploadTasks
155 In order to push an image (or set of images) to many cloud accounts, and get a single
156 status on that operation, we need a single status that represents all of those tasks.
158 The ImageUploadJob provides a single endpoint to control all the tasks and report
159 when all images are successfully upload or when any one fails.
161 STATES
= ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
162 TIMEOUT_JOB
= 6 * 60 * 60 # 6 hours
163 JOB_GEN
= itertools
.count(1)
165 def __init__(self
, log
, loop
, upload_tasks
, job_id
=None, timeout_job
=TIMEOUT_JOB
):
168 self
._upload
_tasks
= upload_tasks
169 self
._job
_id
= next(ImageUploadJob
.JOB_GEN
) if job_id
is None else job_id
170 self
._timeout
_job
= timeout_job
172 self
._state
= "QUEUED"
173 self
._state
_stack
= [self
._state
]
175 self
._start
_time
= time
.time()
178 self
._task
_future
_map
= {}
179 self
._job
_future
= None
182 return "{}(job_id={}, state={})".format(
183 self
.__class
__.__name
__, self
._job
_id
, self
._state
192 """ The state of the ImageUploadJob """
196 def state(self
, new_state
):
197 """ Set the state of the ImageUploadJob """
198 states
= ImageUploadJob
.STATES
199 assert new_state
in states
200 assert states
.index(new_state
) >= states
.index(self
._state
)
201 self
._state
_stack
.append(new_state
)
203 self
._state
= new_state
206 def state_stack(self
):
207 """ The list of states that this job progressed through """
208 return self
._state
_stack
212 """ The UploadJob protobuf message """
213 task
= RwImageMgmtYang
.UploadJob
.from_dict({
215 "status": self
._state
,
216 "start_time": self
._start
_time
,
217 "upload_tasks": [task
.pb_msg
for task
in self
._upload
_tasks
]
221 task
.stop_time
= self
._stop
_time
225 def _start_upload_tasks(self
):
226 self
._log
.debug("Starting %s upload tasks", len(self
._upload
_tasks
))
228 for upload_task
in self
._upload
_tasks
:
232 def _wait_for_upload_tasks(self
):
233 self
._log
.debug("Waiting for upload tasks to complete")
235 wait_coroutines
= [t
.wait() for t
in self
._upload
_tasks
]
237 yield from asyncio
.wait(
239 timeout
=self
._timeout
_job
,
243 self
._log
.debug("All upload tasks completed")
245 def _set_final_job_state(self
):
247 for task
in self
._upload
_tasks
:
248 if task
.state
!= "COMPLETED":
249 failed_tasks
.append(task
)
252 self
._log
.error("%s had %s FAILED tasks.", self
, len(failed_tasks
))
253 self
.state
= "FAILED"
255 self
._log
.debug("%s tasks completed successfully", len(self
._upload
_tasks
))
256 self
.state
= "COMPLETED"
259 def _cancel_job(self
):
260 for task
in self
._upload
_tasks
:
263 # TODO: Wait for all tasks to actually reach terminal
266 self
.state
= "CANCELLED"
270 self
.state
= "IN_PROGRESS"
271 self
._start
_upload
_tasks
()
273 yield from self
._wait
_for
_upload
_tasks
()
274 except asyncio
.CancelledError
:
275 self
._log
.debug("%s was cancelled. Cancelling all tasks.",
277 self
._loop
.create_task(self
._cancel
_job
())
280 self
._stop
_time
= time
.time()
281 self
._job
_future
= None
283 self
._set
_final
_job
_state
()
287 """ Wait for the job to reach a terminal state """
288 if self
._job
_future
is None:
289 raise UploadJobError("Job not started")
291 yield from asyncio
.wait_for(
298 """ Start the job and all child tasks """
299 if self
._state
!= "QUEUED":
300 raise UploadJobError("Job already started")
302 self
._job
_future
= self
._loop
.create_task(self
._do
_job
())
303 return self
._job
_future
306 """ Stop the job and all child tasks """
307 if self
._job
_future
is not None:
308 self
.state
= "CANCELLING"
309 self
._job
_future
.cancel()
312 class ByteRateCalculator(object):
313 """ This class produces a byte rate from inputted measurements"""
314 def __init__(self
, rate_time_constant
):
316 self
._time
_constant
= rate_time_constant
322 def add_measurement(self
, num_bytes
, time_delta
):
323 rate
= num_bytes
/ time_delta
327 self
._rate
+= ((rate
- self
._rate
) / self
._time
_constant
)
332 class UploadProgressWriteProxy(object):
333 """ This class implements a write proxy with produces various progress stats
335 In order to keep the complexity of the UploadTask down, this class acts as a
336 proxy for a file write. By providing the original handle to be written to
337 and having the client class call write() on this object, we can produce the
338 various statistics to be consumed.
340 RATE_TIME_CONSTANT
= 5
342 def __init__(self
, log
, loop
, bytes_total
, write_hdl
):
345 self
._bytes
_total
= bytes_total
346 self
._write
_hdl
= write_hdl
348 self
._bytes
_written
= 0
351 self
._rate
_calc
= ByteRateCalculator(UploadProgressWriteProxy
.RATE_TIME_CONSTANT
)
352 self
._rate
_task
= None
354 def write(self
, data
):
355 self
._write
_hdl
.write(data
)
356 self
._bytes
_written
+= len(data
)
359 self
._write
_hdl
.close()
360 if self
._rate
_task
is not None:
361 self
._log
.debug("stopping rate monitoring task")
362 self
._rate
_task
.cancel()
364 def start_rate_monitoring(self
):
365 """ Start the rate monitoring task """
367 def periodic_rate_task():
369 start_time
= time
.time()
370 start_bytes
= self
._bytes
_written
371 yield from asyncio
.sleep(1, loop
=self
._loop
)
372 time_period
= time
.time() - start_time
373 num_bytes
= self
._bytes
_written
- start_bytes
375 self
._byte
_rate
= self
._rate
_calc
.add_measurement(num_bytes
, time_period
)
377 self
._log
.debug("starting rate monitoring task")
378 self
._rate
_task
= self
._loop
.create_task(periodic_rate_task())
381 def progress_percent(self
):
382 if self
._bytes
_total
== 0:
385 return int(self
._bytes
_written
/ self
._bytes
_total
* 100)
388 def bytes_written(self
):
389 return self
._bytes
_written
392 def bytes_total(self
):
393 return self
._bytes
_total
396 def bytes_rate(self
):
397 return self
._byte
_rate
400 class GlanceImagePipeGen(object):
401 """ This class produces a read file handle from a generator that produces bytes
403 The CAL API takes a file handle as an input. The Glance API creates a generator
404 that produces byte strings. This class acts as the mediator by creating a pipe
405 and pumping the bytestring from the generator into the write side of the pipe.
407 A pipe has the useful feature here that it will block at the buffer size until
408 the reader has consumed. This allows us to only pull from glance and push at the
409 pace of the reader preventing us from having to store the images locally on disk.
411 def __init__(self
, log
, loop
, data_gen
):
414 self
._data
_gen
= data_gen
416 read_fd
, write_fd
= os
.pipe()
418 self
._read
_hdl
= os
.fdopen(read_fd
, 'rb')
419 self
._write
_hdl
= os
.fdopen(write_fd
, 'wb')
420 self
._close
_hdl
= self
._write
_hdl
424 return self
._write
_hdl
427 def write_hdl(self
, new_write_hdl
):
428 self
._write
_hdl
= new_write_hdl
432 return self
._read
_hdl
434 def _gen_writer(self
):
435 self
._log
.debug("starting image data write to pipe")
437 for data
in self
._data
_gen
:
439 self
._write
_hdl
.write(data
)
440 except (BrokenPipeError
, ValueError) as e
:
441 self
._log
.warning("write pipe closed: %s", str(e
))
444 except Exception as e
:
445 self
._log
.exception("error when writing data to pipe: %s", str(e
))
448 self
._log
.debug("closing write side of pipe")
450 self
._write
_hdl
.close()
455 t
= threading
.Thread(target
=self
._gen
_writer
)
460 self
._log
.debug("stop requested, closing write side of pipe")
461 self
._write
_hdl
.close()
464 class AccountImageUploadTask(object):
465 """ This class manages an create_image task from an image info and file handle
467 Manage the upload of a image to a configured cloud account.
469 STATES
= ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
471 TIMEOUT_CHECK_EXISTS
= 10
472 TIMEOUT_IMAGE_UPLOAD
= 6 * 60 * 60 # 6 hours
474 def __init__(self
, log
, loop
, account
, image_info
, image_hdl
,
475 timeout_exists
=TIMEOUT_CHECK_EXISTS
, timeout_upload
=TIMEOUT_IMAGE_UPLOAD
,
476 progress_info
=None, write_canceller
=None
480 self
._account
= account
481 self
._image
_info
= image_info
.deep_copy()
482 self
._image
_hdl
= image_hdl
484 self
._timeout
_exists
= timeout_exists
485 self
._timeout
_upload
= timeout_upload
487 self
._progress
_info
= progress_info
488 self
._write
_canceller
= write_canceller
490 self
._state
= "QUEUED"
491 self
._state
_stack
= [self
._state
]
493 self
._detail
= "Task is waiting to be started"
494 self
._start
_time
= time
.time()
496 self
._upload
_future
= None
498 if not image_info
.has_field("name"):
499 raise ValueError("image info must have name field")
506 def state(self
, new_state
):
507 states
= AccountImageUploadTask
.STATES
508 assert new_state
in states
509 assert states
.index(new_state
) >= states
.index(self
._state
)
510 self
._state
_stack
.append(new_state
)
512 self
._state
= new_state
515 def state_stack(self
):
516 return self
._state
_stack
520 """ The image name being uploaded """
521 return self
._image
_info
.id
524 def image_name(self
):
525 """ The image name being uploaded """
526 return self
._image
_info
.name
529 def image_checksum(self
):
530 """ The image checksum being uploaded """
531 if self
._image
_info
.has_field("checksum"):
532 return self
._image
_info
.checksum
537 def cloud_account(self
):
538 """ The cloud account name which the image is being uploaded to """
539 return self
._account
.name
543 """ The UploadTask protobuf message """
544 task
= RwImageMgmtYang
.UploadTask
.from_dict({
545 "cloud_account": self
.cloud_account
,
546 "image_id": self
.image_id
,
547 "image_name": self
.image_name
,
548 "status": self
.state
,
549 "detail": self
._detail
,
550 "start_time": self
._start
_time
,
553 if self
.image_checksum
is not None:
554 task
.image_checksum
= self
.image_checksum
557 task
.stop_time
= self
._stop
_time
559 if self
._progress
_info
:
560 task
.bytes_written
= self
._progress
_info
.bytes_written
561 task
.bytes_total
= self
._progress
_info
.bytes_total
562 task
.progress_percent
= self
._progress
_info
.progress_percent
563 task
.bytes_per_second
= self
._progress
_info
.bytes_rate
565 if self
.state
== "COMPLETED":
566 task
.progress_percent
= 100
570 def _get_account_images(self
):
572 self
._log
.debug("getting image list for account {}".format(self
._account
.name
))
574 account_images
= self
._account
.get_image_list()
575 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
576 msg
= "could not get image list for account {}".format(self
._account
.name
)
578 raise ImageListError(msg
) from e
580 return account_images
582 def _has_existing_image(self
):
583 account
= self
._account
585 account_images
= self
._get
_account
_images
()
587 matching_images
= [i
for i
in account_images
if i
.name
== self
.image_name
]
589 if self
.image_checksum
is not None:
590 matching_images
= [i
for i
in matching_images
if i
.checksum
== self
.image_checksum
]
593 self
._log
.debug("found matching image with checksum in account %s",
597 self
._log
.debug("did not find matching image with checksum in account %s",
601 def _upload_image(self
):
602 image
= self
._image
_info
603 account
= self
._account
605 image
.fileno
= self
._image
_hdl
.fileno()
607 self
._log
.debug("uploading to account {}: {}".format(account
.name
, image
))
609 image
.id = account
.create_image(image
)
610 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
611 msg
= "error when uploading image {} to cloud account: {}".format(image
.name
, str(e
))
613 raise ImageUploadError(msg
) from e
615 self
._log
.debug('uploaded image (id: {}) to account{}: {}'.format(
616 image
.id, account
.name
, image
.name
))
621 def _do_upload(self
):
623 self
.state
= "CHECK_IMAGE_EXISTS"
624 has_image
= yield from asyncio
.wait_for(
625 self
._loop
.run_in_executor(None, self
._has
_existing
_image
),
626 timeout
=self
._timeout
_exists
,
630 self
.state
= "COMPLETED"
631 self
._detail
= "Image already exists on destination"
634 self
.state
= "UPLOADING"
635 self
._detail
= "Uploading image"
637 # Note that if the upload times out, the upload thread may still
638 # stick around. We'll need another method of cancelling the task
639 # through the VALA interface.
640 image_id
= yield from asyncio
.wait_for(
641 self
._loop
.run_in_executor(None, self
._upload
_image
),
642 timeout
=self
._timeout
_upload
,
646 except asyncio
.CancelledError
as e
:
647 self
.state
= "CANCELLED"
648 self
._detail
= "Image upload cancelled"
650 except ImageUploadTaskError
as e
:
651 self
.state
= "FAILED"
652 self
._detail
= str(e
)
654 except asyncio
.TimeoutError
as e
:
655 self
.state
= "FAILED"
656 self
._detail
= "Timed out during upload task: %s" % str(e
)
659 # If the user does not provide a checksum and performs a URL source
660 # upload with an incorrect URL, then Glance does not indicate a failure
661 # and the CAL cannot detect an incorrect upload. In this case, use
662 # the bytes_written to detect a bad upload and mark the task as failed.
663 if self
._progress
_info
and self
._progress
_info
.bytes_written
== 0:
664 self
.state
= "FAILED"
665 self
._detail
= "No bytes written. Possible bad image source."
668 self
.state
= "COMPLETED"
669 self
._detail
= "Image successfully uploaded. Image id: %s" % image_id
672 self
._stop
_time
= time
.time()
673 self
._upload
_future
= None
677 """ Wait for the upload task to complete """
678 if self
._upload
_future
is None:
679 raise ImageUploadError("Task not started")
681 yield from asyncio
.wait_for(
683 self
._timeout
_upload
, loop
=self
._loop
687 """ Start the upload task """
688 if self
._state
!= "QUEUED":
689 raise ImageUploadError("Task already started")
691 self
._log
.info("Starting %s", self
)
693 self
._upload
_future
= self
._loop
.create_task(self
._do
_upload
())
695 return self
._upload
_future
698 """ Stop the upload task in progress """
699 if self
._upload
_future
is None:
700 self
._log
.warning("Cannot cancel %s. Not in progress.", self
)
703 self
.state
= "CANCELLING"
704 self
._detail
= "Cancellation has been requested"
706 self
._log
.info("Cancelling %s", self
)
707 self
._upload
_future
.cancel()
708 if self
._write
_canceller
is not None:
709 self
._write
_canceller
.stop()