3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 import rift
.mano
.cloud
28 gi
.require_version('RwImageMgmtYang', '1.0')
29 from gi
.repository
import (
34 class UploadJobError(Exception):
38 class ImageUploadTaskError(Exception):
42 class ImageUploadError(ImageUploadTaskError
):
46 class ImageListError(ImageUploadTaskError
):
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS
= 20
54 def __init__(self
, log
, loop
, max_completed_jobs
=MAX_COMPLETED_JOBS
):
57 self
._job
_id
_gen
= itertools
.count(1)
58 self
._max
_completed
_jobs
= max_completed_jobs
61 self
._completed
_jobs
= collections
.deque(
62 maxlen
=self
._max
_completed
_jobs
67 """ the UploadJobs protobuf message """
68 upload_jobs_msg
= RwImageMgmtYang
.UploadJobs()
69 for job
in self
._jobs
.values():
70 upload_jobs_msg
.job
.append(job
.pb_msg
)
72 return upload_jobs_msg
76 """ the tracked list of ImageUploadJobs """
77 return self
._jobs
.values()
80 def completed_jobs(self
):
81 """ completed jobs in the tracked list of ImageUploadJobs """
82 return [job
for job
in self
._jobs
.values() if job
in self
._completed
_jobs
]
85 def active_jobs(self
):
86 """ in-progress jobs in the tracked list of ImageUploadJobs """
87 return [job
for job
in self
._jobs
.values() if job
not in self
._completed
_jobs
]
89 def _add_job(self
, job
):
90 self
._jobs
[job
.id] = job
92 def _start_job(self
, job
, on_completed
=None):
93 def on_job_completed(_
):
94 self
._log
.debug("%s completed. Adding to completed jobs list.", job
)
96 # If adding a new completed job is going to overflow the
97 # completed job list, find the first job that completed and
98 # remove it from the tracked jobs.
99 if len(self
._completed
_jobs
) == self
._completed
_jobs
.maxlen
:
100 first_completed_job
= self
._completed
_jobs
[-1]
101 del self
._jobs
[first_completed_job
.id]
103 self
._completed
_jobs
.appendleft(job
)
105 job_future
= job
.start()
106 job_future
.add_done_callback(on_job_completed
)
108 if on_completed
is not None:
109 job_future
.add_done_callback(on_completed
)
111 def get_job(self
, job_id
):
112 """ Get the UploadJob from the job id
115 job_id - the job id that was previously added to the controller
118 The associated ImageUploadJob
121 LookupError - Could not find the job id
123 if job_id
not in self
._jobs
:
124 raise LookupError("Could not find job_id %s" % job_id
)
126 return self
._jobs
[job_id
]
128 def create_job(self
, image_tasks
, on_completed
=None):
129 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
132 image_tasks - a list of ImageUploadTasks
133 on_completed - a callback which is added to the job future
138 self
._log
.debug("Creating new job from %s image tasks", len(image_tasks
))
139 new_job
= ImageUploadJob(
143 job_id
=next(self
._job
_id
_gen
)
146 self
._add
_job
(new_job
)
147 self
._start
_job
(new_job
, on_completed
=on_completed
)
152 class ImageUploadJob(object):
153 """ This class manages a set of ImageUploadTasks
155 In order to push an image (or set of images) to many cloud accounts, and get a single
156 status on that operation, we need a single status that represents all of those tasks.
158 The ImageUploadJob provides a single endpoint to control all the tasks and report
159 when all images are successfully upload or when any one fails.
161 STATES
= ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
162 TIMEOUT_JOB
= 6 * 60 * 60 # 6 hours
163 JOB_GEN
= itertools
.count(1)
165 def __init__(self
, log
, loop
, upload_tasks
, job_id
=None, timeout_job
=TIMEOUT_JOB
):
168 self
._upload
_tasks
= upload_tasks
169 self
._job
_id
= next(ImageUploadJob
.JOB_GEN
) if job_id
is None else job_id
170 self
._timeout
_job
= timeout_job
172 self
._state
= "QUEUED"
173 self
._state
_stack
= [self
._state
]
175 self
._start
_time
= time
.time()
178 self
._task
_future
_map
= {}
179 self
._job
_future
= None
182 return "{}(job_id={}, state={})".format(
183 self
.__class
__.__name
__, self
._job
_id
, self
._state
192 """ The state of the ImageUploadJob """
196 def state(self
, new_state
):
197 """ Set the state of the ImageUploadJob """
198 states
= ImageUploadJob
.STATES
199 assert new_state
in states
200 assert states
.index(new_state
) >= states
.index(self
._state
)
201 self
._state
_stack
.append(new_state
)
203 self
._state
= new_state
206 def state_stack(self
):
207 """ The list of states that this job progressed through """
208 return self
._state
_stack
212 """ The UploadJob protobuf message """
213 task
= RwImageMgmtYang
.UploadJob
.from_dict({
215 "status": self
._state
,
216 "start_time": self
._start
_time
,
217 "upload_tasks": [task
.pb_msg
for task
in self
._upload
_tasks
]
221 task
.stop_time
= self
._stop
_time
225 def _start_upload_tasks(self
):
226 self
._log
.debug("Starting %s upload tasks", len(self
._upload
_tasks
))
228 for upload_task
in self
._upload
_tasks
:
232 def _wait_for_upload_tasks(self
):
233 self
._log
.debug("Waiting for upload tasks to complete")
235 wait_coroutines
= [t
.wait() for t
in self
._upload
_tasks
]
237 yield from asyncio
.wait(
239 timeout
=self
._timeout
_job
,
243 self
._log
.debug("All upload tasks completed")
245 def _set_final_job_state(self
):
247 for task
in self
._upload
_tasks
:
248 if task
.state
!= "COMPLETED":
249 failed_tasks
.append(task
)
252 self
._log
.error("%s had %s FAILED tasks.", self
, len(failed_tasks
))
253 for ftask
in failed_tasks
:
254 self
._log
.error("%s : Failed to upload image : %s to cloud_account : %s", self
, ftask
.image_name
, ftask
.cloud_account
)
255 self
.state
= "FAILED"
257 self
._log
.debug("%s tasks completed successfully", len(self
._upload
_tasks
))
258 self
.state
= "COMPLETED"
261 def _cancel_job(self
):
262 for task
in self
._upload
_tasks
:
265 # TODO: Wait for all tasks to actually reach terminal
268 self
.state
= "CANCELLED"
272 self
.state
= "IN_PROGRESS"
273 self
._start
_upload
_tasks
()
275 yield from self
._wait
_for
_upload
_tasks
()
276 except asyncio
.CancelledError
:
277 self
._log
.debug("%s was cancelled. Cancelling all tasks.",
279 self
._loop
.create_task(self
._cancel
_job
())
282 self
._stop
_time
= time
.time()
283 self
._job
_future
= None
285 self
._set
_final
_job
_state
()
289 """ Wait for the job to reach a terminal state """
290 if self
._job
_future
is None:
291 raise UploadJobError("Job not started")
293 yield from asyncio
.wait_for(
300 """ Start the job and all child tasks """
301 if self
._state
!= "QUEUED":
302 raise UploadJobError("Job already started")
304 self
._job
_future
= self
._loop
.create_task(self
._do
_job
())
305 return self
._job
_future
308 """ Stop the job and all child tasks """
309 if self
._job
_future
is not None:
310 self
.state
= "CANCELLING"
311 self
._job
_future
.cancel()
314 class ByteRateCalculator(object):
315 """ This class produces a byte rate from inputted measurements"""
316 def __init__(self
, rate_time_constant
):
318 self
._time
_constant
= rate_time_constant
324 def add_measurement(self
, num_bytes
, time_delta
):
325 rate
= num_bytes
/ time_delta
329 self
._rate
+= ((rate
- self
._rate
) / self
._time
_constant
)
334 class UploadProgressWriteProxy(object):
335 """ This class implements a write proxy with produces various progress stats
337 In order to keep the complexity of the UploadTask down, this class acts as a
338 proxy for a file write. By providing the original handle to be written to
339 and having the client class call write() on this object, we can produce the
340 various statistics to be consumed.
342 RATE_TIME_CONSTANT
= 5
344 def __init__(self
, log
, loop
, bytes_total
, write_hdl
):
347 self
._bytes
_total
= bytes_total
348 self
._write
_hdl
= write_hdl
350 self
._bytes
_written
= 0
353 self
._rate
_calc
= ByteRateCalculator(UploadProgressWriteProxy
.RATE_TIME_CONSTANT
)
354 self
._rate
_task
= None
356 def write(self
, data
):
357 self
._write
_hdl
.write(data
)
358 self
._bytes
_written
+= len(data
)
361 self
._write
_hdl
.close()
362 if self
._rate
_task
is not None:
363 self
._log
.debug("stopping rate monitoring task")
364 self
._rate
_task
.cancel()
366 def start_rate_monitoring(self
):
367 """ Start the rate monitoring task """
369 def periodic_rate_task():
371 start_time
= time
.time()
372 start_bytes
= self
._bytes
_written
373 yield from asyncio
.sleep(1, loop
=self
._loop
)
374 time_period
= time
.time() - start_time
375 num_bytes
= self
._bytes
_written
- start_bytes
377 self
._byte
_rate
= self
._rate
_calc
.add_measurement(num_bytes
, time_period
)
379 self
._log
.debug("starting rate monitoring task")
380 self
._rate
_task
= self
._loop
.create_task(periodic_rate_task())
383 def progress_percent(self
):
384 if self
._bytes
_total
== 0:
387 return int(self
._bytes
_written
/ self
._bytes
_total
* 100)
390 def bytes_written(self
):
391 return self
._bytes
_written
394 def bytes_total(self
):
395 return self
._bytes
_total
398 def bytes_rate(self
):
399 return self
._byte
_rate
402 class GlanceImagePipeGen(object):
403 """ This class produces a read file handle from a generator that produces bytes
405 The CAL API takes a file handle as an input. The Glance API creates a generator
406 that produces byte strings. This class acts as the mediator by creating a pipe
407 and pumping the bytestring from the generator into the write side of the pipe.
409 A pipe has the useful feature here that it will block at the buffer size until
410 the reader has consumed. This allows us to only pull from glance and push at the
411 pace of the reader preventing us from having to store the images locally on disk.
413 def __init__(self
, log
, loop
, data_gen
):
416 self
._data
_gen
= data_gen
418 read_fd
, write_fd
= os
.pipe()
420 self
._read
_hdl
= os
.fdopen(read_fd
, 'rb')
421 self
._write
_hdl
= os
.fdopen(write_fd
, 'wb')
422 self
._close
_hdl
= self
._write
_hdl
426 return self
._write
_hdl
429 def write_hdl(self
, new_write_hdl
):
430 self
._write
_hdl
= new_write_hdl
434 return self
._read
_hdl
436 def _gen_writer(self
):
437 self
._log
.debug("starting image data write to pipe")
439 for data
in self
._data
_gen
:
441 self
._write
_hdl
.write(data
)
442 except (BrokenPipeError
, ValueError) as e
:
443 self
._log
.warning("write pipe closed: %s", str(e
))
446 except Exception as e
:
447 self
._log
.exception("error when writing data to pipe: %s", str(e
))
450 self
._log
.debug("closing write side of pipe")
452 self
._write
_hdl
.close()
457 t
= threading
.Thread(target
=self
._gen
_writer
)
462 self
._log
.debug("stop requested, closing write side of pipe")
463 self
._write
_hdl
.close()
466 class AccountImageUploadTask(object):
467 """ This class manages an create_image task from an image info and file handle
469 Manage the upload of a image to a configured cloud account.
471 STATES
= ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
473 TIMEOUT_CHECK_EXISTS
= 10
474 TIMEOUT_IMAGE_UPLOAD
= 6 * 60 * 60 # 6 hours
476 def __init__(self
, log
, loop
, account
, image_info
, image_hdl
,
477 timeout_exists
=TIMEOUT_CHECK_EXISTS
, timeout_upload
=TIMEOUT_IMAGE_UPLOAD
,
478 progress_info
=None, write_canceller
=None
482 self
._account
= account
483 self
._image
_info
= image_info
.deep_copy()
484 self
._image
_hdl
= image_hdl
486 self
._timeout
_exists
= timeout_exists
487 self
._timeout
_upload
= timeout_upload
489 self
._progress
_info
= progress_info
490 self
._write
_canceller
= write_canceller
492 self
._state
= "QUEUED"
493 self
._state
_stack
= [self
._state
]
495 self
._detail
= "Task is waiting to be started"
496 self
._start
_time
= time
.time()
498 self
._upload
_future
= None
500 if not image_info
.has_field("name"):
501 raise ValueError("image info must have name field")
508 def state(self
, new_state
):
509 states
= AccountImageUploadTask
.STATES
510 assert new_state
in states
511 assert states
.index(new_state
) >= states
.index(self
._state
)
512 self
._state
_stack
.append(new_state
)
514 self
._state
= new_state
517 def state_stack(self
):
518 return self
._state
_stack
522 """ The image name being uploaded """
523 return self
._image
_info
.id
526 def image_name(self
):
527 """ The image name being uploaded """
528 return self
._image
_info
.name
531 def image_checksum(self
):
532 """ The image checksum being uploaded """
533 if self
._image
_info
.has_field("checksum"):
534 return self
._image
_info
.checksum
539 def cloud_account(self
):
540 """ The cloud account name which the image is being uploaded to """
541 return self
._account
.name
545 """ The UploadTask protobuf message """
546 task
= RwImageMgmtYang
.UploadTask
.from_dict({
547 "cloud_account": self
.cloud_account
,
548 "image_id": self
.image_id
,
549 "image_name": self
.image_name
,
550 "status": self
.state
,
551 "detail": self
._detail
,
552 "start_time": self
._start
_time
,
555 if self
.image_checksum
is not None:
556 task
.image_checksum
= self
.image_checksum
559 task
.stop_time
= self
._stop
_time
561 if self
._progress
_info
:
562 task
.bytes_written
= self
._progress
_info
.bytes_written
563 task
.bytes_total
= self
._progress
_info
.bytes_total
564 task
.progress_percent
= self
._progress
_info
.progress_percent
565 task
.bytes_per_second
= self
._progress
_info
.bytes_rate
567 if self
.state
== "COMPLETED":
568 task
.progress_percent
= 100
572 def _get_account_images(self
):
574 self
._log
.debug("getting image list for account {}".format(self
._account
.name
))
576 account_images
= self
._account
.get_image_list()
577 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
578 msg
= "could not get image list for account {}".format(self
._account
.name
)
580 raise ImageListError(msg
) from e
582 return account_images
584 def _has_existing_image(self
):
585 account
= self
._account
587 account_images
= self
._get
_account
_images
()
589 matching_images
= [i
for i
in account_images
if i
.name
== self
.image_name
]
591 if self
.image_checksum
is not None:
592 matching_images
= [i
for i
in matching_images
if i
.checksum
== self
.image_checksum
]
595 self
._log
.debug("found matching image with checksum in account %s",
599 self
._log
.debug("did not find matching image with checksum in account %s",
603 def _upload_image(self
):
604 image
= self
._image
_info
605 account
= self
._account
607 image
.fileno
= self
._image
_hdl
.fileno()
609 self
._log
.debug("uploading to account {}: {}".format(account
.name
, image
))
611 image
.id = account
.create_image(image
)
612 except rift
.mano
.cloud
.CloudAccountCalError
as e
:
613 msg
= "error when uploading image {} to cloud account: {}".format(image
.name
, str(e
))
615 raise ImageUploadError(msg
) from e
617 self
._log
.debug('uploaded image (id: {}) to account{}: {}'.format(
618 image
.id, account
.name
, image
.name
))
623 def _do_upload(self
):
625 self
.state
= "CHECK_IMAGE_EXISTS"
626 has_image
= yield from asyncio
.wait_for(
627 self
._loop
.run_in_executor(None, self
._has
_existing
_image
),
628 timeout
=self
._timeout
_exists
,
632 self
.state
= "COMPLETED"
633 self
._detail
= "Image already exists on destination"
636 self
.state
= "UPLOADING"
637 self
._detail
= "Uploading image"
639 # Note that if the upload times out, the upload thread may still
640 # stick around. We'll need another method of cancelling the task
641 # through the VALA interface.
642 image_id
= yield from asyncio
.wait_for(
643 self
._loop
.run_in_executor(None, self
._upload
_image
),
644 timeout
=self
._timeout
_upload
,
648 except asyncio
.CancelledError
as e
:
649 self
.state
= "CANCELLED"
650 self
._detail
= "Image upload cancelled"
652 except ImageUploadTaskError
as e
:
653 self
.state
= "FAILED"
654 self
._detail
= str(e
)
656 except asyncio
.TimeoutError
as e
:
657 self
.state
= "FAILED"
658 self
._detail
= "Timed out during upload task: %s" % str(e
)
661 # If the user does not provide a checksum and performs a URL source
662 # upload with an incorrect URL, then Glance does not indicate a failure
663 # and the CAL cannot detect an incorrect upload. In this case, use
664 # the bytes_written to detect a bad upload and mark the task as failed.
665 if self
._progress
_info
and self
._progress
_info
.bytes_written
== 0:
666 self
.state
= "FAILED"
667 self
._detail
= "No bytes written. Possible bad image source."
670 self
.state
= "COMPLETED"
671 self
._detail
= "Image successfully uploaded. Image id: %s" % image_id
674 self
._stop
_time
= time
.time()
675 self
._upload
_future
= None
679 """ Wait for the upload task to complete """
680 if self
._upload
_future
is None:
681 raise ImageUploadError("Task not started")
683 yield from asyncio
.wait_for(
685 self
._timeout
_upload
, loop
=self
._loop
689 """ Start the upload task """
690 if self
._state
!= "QUEUED":
691 raise ImageUploadError("Task already started")
693 self
._log
.info("Starting %s", self
)
695 self
._upload
_future
= self
._loop
.create_task(self
._do
_upload
())
697 return self
._upload
_future
700 """ Stop the upload task in progress """
701 if self
._upload
_future
is None:
702 self
._log
.warning("Cannot cancel %s. Not in progress.", self
)
705 self
.state
= "CANCELLING"
706 self
._detail
= "Cancellation has been requested"
708 self
._log
.info("Cancelling %s", self
)
709 self
._upload
_future
.cancel()
710 if self
._write
_canceller
is not None:
711 self
._write
_canceller
.stop()