3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 import rift
.mano
.cloud
28 gi
.require_version('RwImageMgmtYang', '1.0')
29 from gi
.repository
import (
34 class UploadJobError(Exception):
38 class ImageUploadTaskError(Exception):
42 class ImageUploadError(ImageUploadTaskError
):
46 class ImageListError(ImageUploadTaskError
):
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS
= 20
54 def __init__(self
, project
, max_completed_jobs
=MAX_COMPLETED_JOBS
):
55 self
._log
= project
.log
56 self
._loop
= project
.loop
57 self
._project
= project
58 self
._job
_id
_gen
= itertools
.count(1)
59 self
._max
_completed
_jobs
= max_completed_jobs
62 self
._completed
_jobs
= collections
.deque(
63 maxlen
=self
._max
_completed
_jobs
68 """ the UploadJobs protobuf message """
69 upload_jobs_msg
= RwImageMgmtYang
.YangData_RwProject_Project_UploadJobs()
70 for job
in self
._jobs
.values():
71 upload_jobs_msg
.job
.append(job
.pb_msg
)
73 return upload_jobs_msg
77 """ the tracked list of ImageUploadJobs """
78 return self
._jobs
.values()
81 def completed_jobs(self
):
82 """ completed jobs in the tracked list of ImageUploadJobs """
83 return [job
for job
in self
._jobs
.values() if job
in self
._completed
_jobs
]
86 def active_jobs(self
):
87 """ in-progress jobs in the tracked list of ImageUploadJobs """
88 return [job
for job
in self
._jobs
.values() if job
not in self
._completed
_jobs
]
90 def _add_job(self
, job
):
91 self
._jobs
[job
.id] = job
93 def _start_job(self
, job
, on_completed
=None):
94 def on_job_completed(_
):
95 self
._log
.debug("%s completed. Adding to completed jobs list.", job
)
97 # If adding a new completed job is going to overflow the
98 # completed job list, find the first job that completed and
99 # remove it from the tracked jobs.
100 if len(self
._completed
_jobs
) == self
._completed
_jobs
.maxlen
:
101 first_completed_job
= self
._completed
_jobs
[-1]
102 del self
._jobs
[first_completed_job
.id]
104 self
._completed
_jobs
.appendleft(job
)
106 job_future
= job
.start()
107 job_future
.add_done_callback(on_job_completed
)
109 if on_completed
is not None:
110 job_future
.add_done_callback(on_completed
)
112 def get_job(self
, job_id
):
113 """ Get the UploadJob from the job id
116 job_id - the job id that was previously added to the controller
119 The associated ImageUploadJob
122 LookupError - Could not find the job id
124 if job_id
not in self
._jobs
:
125 raise LookupError("Could not find job_id %s" % job_id
)
127 return self
._jobs
[job_id
]
129 def create_job(self
, image_tasks
, on_completed
=None):
130 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
133 image_tasks - a list of ImageUploadTasks
134 on_completed - a callback which is added to the job future
139 self
._log
.debug("Creating new job from %s image tasks", len(image_tasks
))
140 new_job
= ImageUploadJob(
144 job_id
=next(self
._job
_id
_gen
)
147 self
._add
_job
(new_job
)
148 self
._start
_job
(new_job
, on_completed
=on_completed
)
153 class ImageUploadJob(object):
154 """ This class manages a set of ImageUploadTasks
156 In order to push an image (or set of images) to many cloud accounts, and get a single
157 status on that operation, we need a single status that represents all of those tasks.
159 The ImageUploadJob provides a single endpoint to control all the tasks and report
160 when all images are successfully upload or when any one fails.
162 STATES
= ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
163 TIMEOUT_JOB
= 6 * 60 * 60 # 6 hours
164 JOB_GEN
= itertools
.count(1)
166 def __init__(self
, log
, loop
, upload_tasks
, job_id
=None, timeout_job
=TIMEOUT_JOB
):
169 self
._upload
_tasks
= upload_tasks
170 self
._job
_id
= next(ImageUploadJob
.JOB_GEN
) if job_id
is None else job_id
171 self
._timeout
_job
= timeout_job
173 self
._state
= "QUEUED"
174 self
._state
_stack
= [self
._state
]
176 self
._start
_time
= time
.time()
179 self
._task
_future
_map
= {}
180 self
._job
_future
= None
183 return "{}(job_id={}, state={})".format(
184 self
.__class
__.__name
__, self
._job
_id
, self
._state
193 """ The state of the ImageUploadJob """
197 def state(self
, new_state
):
198 """ Set the state of the ImageUploadJob """
199 states
= ImageUploadJob
.STATES
200 assert new_state
in states
201 assert states
.index(new_state
) >= states
.index(self
._state
)
202 self
._state
_stack
.append(new_state
)
204 self
._state
= new_state
207 def state_stack(self
):
208 """ The list of states that this job progressed through """
209 return self
._state
_stack
213 """ The UploadJob protobuf message """
214 task
= RwImageMgmtYang
.YangData_RwProject_Project_UploadJobs_Job
.from_dict({
216 "status": self
._state
,
217 "start_time": self
._start
_time
,
218 "upload_tasks": [task
.pb_msg
for task
in self
._upload
_tasks
]
222 task
.stop_time
= self
._stop
_time
226 def _start_upload_tasks(self
):
227 self
._log
.debug("Starting %s upload tasks", len(self
._upload
_tasks
))
229 for upload_task
in self
._upload
_tasks
:
233 def _wait_for_upload_tasks(self
):
234 self
._log
.debug("Waiting for upload tasks to complete")
236 wait_coroutines
= [t
.wait() for t
in self
._upload
_tasks
]
238 yield from asyncio
.wait(
240 timeout
=self
._timeout
_job
,
244 self
._log
.debug("All upload tasks completed")
246 def _set_final_job_state(self
):
248 for task
in self
._upload
_tasks
:
249 if task
.state
!= "COMPLETED":
250 failed_tasks
.append(task
)
253 self
._log
.error("%s had %s FAILED tasks.", self
, len(failed_tasks
))
254 for ftask
in failed_tasks
:
255 self
._log
.error("%s : Failed to upload image : %s to cloud_account : %s", self
, ftask
.image_name
, ftask
.cloud_account
)
256 self
.state
= "FAILED"
258 self
._log
.debug("%s tasks completed successfully", len(self
._upload
_tasks
))
259 self
.state
= "COMPLETED"
262 def _cancel_job(self
):
263 for task
in self
._upload
_tasks
:
266 # TODO: Wait for all tasks to actually reach terminal
269 self
.state
= "CANCELLED"
273 self
.state
= "IN_PROGRESS"
274 self
._start
_upload
_tasks
()
276 yield from self
._wait
_for
_upload
_tasks
()
277 except asyncio
.CancelledError
:
278 self
._log
.debug("%s was cancelled. Cancelling all tasks.",
280 self
._loop
.create_task(self
._cancel
_job
())
283 self
._stop
_time
= time
.time()
284 self
._job
_future
= None
286 self
._set
_final
_job
_state
()
290 """ Wait for the job to reach a terminal state """
291 if self
._job
_future
is None:
292 raise UploadJobError("Job not started")
294 yield from asyncio
.wait_for(
301 """ Start the job and all child tasks """
302 if self
._state
!= "QUEUED":
303 raise UploadJobError("Job already started")
305 self
._job
_future
= self
._loop
.create_task(self
._do
_job
())
306 return self
._job
_future
309 """ Stop the job and all child tasks """
310 if self
._job
_future
is not None:
311 self
.state
= "CANCELLING"
312 self
._job
_future
.cancel()
315 class ByteRateCalculator(object):
316 """ This class produces a byte rate from inputted measurements"""
317 def __init__(self
, rate_time_constant
):
319 self
._time
_constant
= rate_time_constant
325 def add_measurement(self
, num_bytes
, time_delta
):
326 rate
= num_bytes
/ time_delta
330 self
._rate
+= ((rate
- self
._rate
) / self
._time
_constant
)
335 class UploadProgressWriteProxy(object):
336 """ This class implements a write proxy with produces various progress stats
338 In order to keep the complexity of the UploadTask down, this class acts as a
339 proxy for a file write. By providing the original handle to be written to
340 and having the client class call write() on this object, we can produce the
341 various statistics to be consumed.
343 RATE_TIME_CONSTANT
= 5
345 def __init__(self
, log
, loop
, bytes_total
, write_hdl
):
348 self
._bytes
_total
= bytes_total
349 self
._write
_hdl
= write_hdl
351 self
._bytes
_written
= 0
354 self
._rate
_calc
= ByteRateCalculator(UploadProgressWriteProxy
.RATE_TIME_CONSTANT
)
355 self
._rate
_task
= None
357 def write(self
, data
):
358 self
._write
_hdl
.write(data
)
359 self
._bytes
_written
+= len(data
)
362 self
._write
_hdl
.close()
363 if self
._rate
_task
is not None:
364 self
._log
.debug("stopping rate monitoring task")
365 self
._rate
_task
.cancel()
367 def start_rate_monitoring(self
):
368 """ Start the rate monitoring task """
370 def periodic_rate_task():
373 start_time
= time
.time()
374 start_bytes
= self
._bytes
_written
375 yield from asyncio
.sleep(1, loop
=self
._loop
)
376 time_period
= time
.time() - start_time
377 num_bytes
= self
._bytes
_written
- start_bytes
379 self
._byte
_rate
= self
._rate
_calc
.add_measurement(num_bytes
, time_period
)
380 except asyncio
.CancelledError
:
381 self
._log
.debug("rate monitoring task cancelled")
383 self
._log
.debug("starting rate monitoring task")
384 self
._rate
_task
= self
._loop
.create_task(periodic_rate_task())
387 def progress_percent(self
):
388 if self
._bytes
_total
== 0:
391 return int(self
._bytes
_written
/ self
._bytes
_total
* 100)
394 def bytes_written(self
):
395 return self
._bytes
_written
398 def bytes_total(self
):
399 return self
._bytes
_total
402 def bytes_rate(self
):
403 return self
._byte
_rate
406 class GlanceImagePipeGen(object):
407 """ This class produces a read file handle from a generator that produces bytes
409 The CAL API takes a file handle as an input. The Glance API creates a generator
410 that produces byte strings. This class acts as the mediator by creating a pipe
411 and pumping the bytestring from the generator into the write side of the pipe.
413 A pipe has the useful feature here that it will block at the buffer size until
414 the reader has consumed. This allows us to only pull from glance and push at the
415 pace of the reader preventing us from having to store the images locally on disk.
417 def __init__(self
, log
, loop
, data_gen
):
420 self
._data
_gen
= data_gen
422 read_fd
, write_fd
= os
.pipe()
424 self
._read
_hdl
= os
.fdopen(read_fd
, 'rb')
425 self
._write
_hdl
= os
.fdopen(write_fd
, 'wb')
426 self
._close
_hdl
= self
._write
_hdl
433 return self
._write
_hdl
436 def write_hdl(self
, new_write_hdl
):
437 self
._write
_hdl
= new_write_hdl
441 return self
._read
_hdl
443 def _gen_writer(self
):
444 self
._log
.debug("starting image data write to pipe")
446 for data
in self
._data
_gen
:
451 self
._write
_hdl
.write(data
)
452 except (BrokenPipeError
, ValueError) as e
:
453 self
._log
.warning("write pipe closed: %s", str(e
))
456 except Exception as e
:
457 self
._log
.exception("error when writing data to pipe: %s", str(e
))
460 self
._log
.debug("closing write side of pipe")
462 self
._write
_hdl
.close()
467 t
= threading
.Thread(target
=self
._gen
_writer
)
474 self
._log
.debug("stop requested, closing write side of pipe")
476 if self
._t
is not None:
477 self
._t
.join(timeout
=1)
480 class AccountImageUploadTask(object):
481 """ This class manages an create_image task from an image info and file handle
483 Manage the upload of a image to a configured cloud account.
485 STATES
= ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
487 TIMEOUT_CHECK_EXISTS
= 10
488 TIMEOUT_IMAGE_UPLOAD
= 6 * 60 * 60 # 6 hours
490 def __init__(self
, log
, loop
, account
, image_info
, image_hdl
,
491 timeout_exists
=TIMEOUT_CHECK_EXISTS
, timeout_upload
=TIMEOUT_IMAGE_UPLOAD
,
492 progress_info
=None, write_canceller
=None
496 self
._account
= account
497 self
._image
_info
= image_info
.deep_copy()
498 self
._image
_hdl
= image_hdl
500 self
._timeout
_exists
= timeout_exists
501 self
._timeout
_upload
= timeout_upload
503 self
._progress
_info
= progress_info
504 self
._write
_canceller
= write_canceller
506 self
._state
= "QUEUED"
507 self
._state
_stack
= [self
._state
]
509 self
._detail
= "Task is waiting to be started"
510 self
._start
_time
= time
.time()
512 self
._upload
_future
= None
514 if not image_info
.has_field("name"):
515 raise ValueError("image info must have name field")
522 def state(self
, new_state
):
523 states
= AccountImageUploadTask
.STATES
524 assert new_state
in states
525 assert states
.index(new_state
) >= states
.index(self
._state
)
526 self
._state
_stack
.append(new_state
)
528 self
._state
= new_state
531 def state_stack(self
):
532 return self
._state
_stack
536 """ The image name being uploaded """
537 return self
._image
_info
.id
540 def image_name(self
):
541 """ The image name being uploaded """
542 return self
._image
_info
.name
545 def image_checksum(self
):
546 """ The image checksum being uploaded """
547 if self
._image
_info
.has_field("checksum"):
548 return self
._image
_info
.checksum
553 def cloud_account(self
):
554 """ The cloud account name which the image is being uploaded to """
555 return self
._account
.name
559 """ The UploadTask protobuf message """
560 task
= RwImageMgmtYang
.YangData_RwProject_Project_UploadJobs_Job_UploadTasks
.from_dict({
561 "cloud_account": self
.cloud_account
,
562 "image_id": self
.image_id
,
563 "image_name": self
.image_name
,
564 "status": self
.state
,
565 "detail": self
._detail
,
566 "start_time": self
._start
_time
,
569 if self
.image_checksum
is not None:
570 task
.image_checksum
= self
.image_checksum
573 task
.stop_time
= self
._stop
_time
575 if self
._progress
_info
:
576 task
.bytes_written
= self
._progress
_info
.bytes_written
577 task
.bytes_total
= self
._progress
_info
.bytes_total
578 task
.progress_percent
= self
._progress
_info
.progress_percent
579 task
.bytes_per_second
= self
._progress
_info
.bytes_rate
581 if self
.state
== "COMPLETED":
582 task
.progress_percent
= 100
586 def _get_account_images(self
):
588 self
._log
.debug("getting image list for account {}".format(self
._account
.name
))
590 account_images
= self
._account
.get_image_list()
591 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
592 msg
= "could not get image list for account {}".format(self
._account
.name
)
594 raise ImageListError(msg
) from e
596 return account_images
598 def _has_existing_image(self
):
599 account
= self
._account
601 account_images
= self
._get
_account
_images
()
603 matching_images
= [i
for i
in account_images
if i
.name
== self
.image_name
]
605 if self
.image_checksum
is not None:
606 matching_images
= [i
for i
in matching_images
if i
.checksum
== self
.image_checksum
]
609 self
._log
.debug("found matching image with checksum in account %s",
613 self
._log
.debug("did not find matching image with checksum in account %s",
617 def _upload_image(self
):
618 image
= self
._image
_info
619 account
= self
._account
621 image
.fileno
= self
._image
_hdl
.fileno()
623 self
._log
.debug("uploading to account {}: {}".format(account
.name
, image
))
625 image
.id = account
.create_image(image
)
626 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
627 msg
= "error when uploading image {} to cloud account: {}".format(image
.name
, str(e
))
629 raise ImageUploadError(msg
) from e
631 self
._log
.debug('uploaded image (id: {}) to account{}: {}'.format(
632 image
.id, account
.name
, image
.name
))
637 def _do_upload(self
):
639 self
.state
= "CHECK_IMAGE_EXISTS"
640 has_image
= yield from asyncio
.wait_for(
641 self
._loop
.run_in_executor(None, self
._has
_existing
_image
),
642 timeout
=self
._timeout
_exists
,
646 self
.state
= "COMPLETED"
647 self
._detail
= "Image already exists on destination"
650 self
.state
= "UPLOADING"
651 self
._detail
= "Uploading image"
653 # Note that if the upload times out, the upload thread may still
654 # stick around. We'll need another method of cancelling the task
655 # through the VALA interface.
656 image_id
= yield from asyncio
.wait_for(
657 self
._loop
.run_in_executor(None, self
._upload
_image
),
658 timeout
=self
._timeout
_upload
,
662 except asyncio
.CancelledError
as e
:
663 self
.state
= "CANCELLED"
664 self
._detail
= "Image upload cancelled"
666 except ImageUploadTaskError
as e
:
667 self
.state
= "FAILED"
668 self
._detail
= str(e
)
670 except asyncio
.TimeoutError
as e
:
671 self
.state
= "FAILED"
672 self
._detail
= "Timed out during upload task: %s" % str(e
)
675 # If the user does not provide a checksum and performs a URL source
676 # upload with an incorrect URL, then Glance does not indicate a failure
677 # and the CAL cannot detect an incorrect upload. In this case, use
678 # the bytes_written to detect a bad upload and mark the task as failed.
679 if self
._progress
_info
and self
._progress
_info
.bytes_written
== 0:
680 self
.state
= "FAILED"
681 self
._detail
= "No bytes written. Possible bad image source."
684 self
.state
= "COMPLETED"
685 self
._detail
= "Image successfully uploaded. Image id: %s" % image_id
688 self
._stop
_time
= time
.time()
689 self
._upload
_future
= None
693 """ Wait for the upload task to complete """
694 if self
._upload
_future
is None:
695 raise ImageUploadError("Task not started")
697 yield from asyncio
.wait_for(
699 self
._timeout
_upload
, loop
=self
._loop
703 """ Start the upload task """
704 if self
._state
!= "QUEUED":
705 raise ImageUploadError("Task already started")
707 self
._log
.info("Starting %s", self
)
709 self
._upload
_future
= self
._loop
.create_task(self
._do
_upload
())
711 return self
._upload
_future
714 """ Stop the upload task in progress """
715 if self
._upload
_future
is None:
716 self
._log
.warning("Cannot cancel %s. Not in progress.", self
)
719 self
.state
= "CANCELLING"
720 self
._detail
= "Cancellation has been requested"
722 self
._log
.info("Cancelling %s", self
)
723 self
._upload
_future
.cancel()
724 if self
._write
_canceller
is not None:
725 self
._write
_canceller
.stop()