7ce74b22c76e1abb61795d032ef3574922fa498e
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import collections
20 import itertools
21 import os
22 import time
23 import threading
24
25 import rift.mano.cloud
26
27 import gi
28 gi.require_version('RwImageMgmtYang', '1.0')
29 from gi.repository import (
30 RwImageMgmtYang,
31 )
32
33
34 class UploadJobError(Exception):
35 pass
36
37
38 class ImageUploadTaskError(Exception):
39 pass
40
41
42 class ImageUploadError(ImageUploadTaskError):
43 pass
44
45
46 class ImageListError(ImageUploadTaskError):
47 pass
48
49
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS = 20
53
54 def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS):
55 self._log = log
56 self._loop = loop
57 self._job_id_gen = itertools.count(1)
58 self._max_completed_jobs = max_completed_jobs
59
60 self._jobs = {}
61 self._completed_jobs = collections.deque(
62 maxlen=self._max_completed_jobs
63 )
64
65 @property
66 def pb_msg(self):
67 """ the UploadJobs protobuf message """
68 upload_jobs_msg = RwImageMgmtYang.UploadJobs()
69 for job in self._jobs.values():
70 upload_jobs_msg.job.append(job.pb_msg)
71
72 return upload_jobs_msg
73
74 @property
75 def jobs(self):
76 """ the tracked list of ImageUploadJobs """
77 return self._jobs.values()
78
79 @property
80 def completed_jobs(self):
81 """ completed jobs in the tracked list of ImageUploadJobs """
82 return [job for job in self._jobs.values() if job in self._completed_jobs]
83
84 @property
85 def active_jobs(self):
86 """ in-progress jobs in the tracked list of ImageUploadJobs """
87 return [job for job in self._jobs.values() if job not in self._completed_jobs]
88
89 def _add_job(self, job):
90 self._jobs[job.id] = job
91
92 def _start_job(self, job, on_completed=None):
93 def on_job_completed(_):
94 self._log.debug("%s completed. Adding to completed jobs list.", job)
95
96 # If adding a new completed job is going to overflow the
97 # completed job list, find the first job that completed and
98 # remove it from the tracked jobs.
99 if len(self._completed_jobs) == self._completed_jobs.maxlen:
100 first_completed_job = self._completed_jobs[-1]
101 del self._jobs[first_completed_job.id]
102
103 self._completed_jobs.appendleft(job)
104
105 job_future = job.start()
106 job_future.add_done_callback(on_job_completed)
107
108 if on_completed is not None:
109 job_future.add_done_callback(on_completed)
110
111 def get_job(self, job_id):
112 """ Get the UploadJob from the job id
113
114 Arguments:
115 job_id - the job id that was previously added to the controller
116
117 Returns:
118 The associated ImageUploadJob
119
120 Raises:
121 LookupError - Could not find the job id
122 """
123 if job_id not in self._jobs:
124 raise LookupError("Could not find job_id %s" % job_id)
125
126 return self._jobs[job_id]
127
128 def create_job(self, image_tasks, on_completed=None):
129 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
130
131 Arguments:
132 image_tasks - a list of ImageUploadTasks
133 on_completed - a callback which is added to the job future
134
135 Returns:
136 A ImageUploadJob id
137 """
138 self._log.debug("Creating new job from %s image tasks", len(image_tasks))
139 new_job = ImageUploadJob(
140 self._log,
141 self._loop,
142 image_tasks,
143 job_id=next(self._job_id_gen)
144 )
145
146 self._add_job(new_job)
147 self._start_job(new_job, on_completed=on_completed)
148
149 return new_job.id
150
151
152 class ImageUploadJob(object):
153 """ This class manages a set of ImageUploadTasks
154
155 In order to push an image (or set of images) to many cloud accounts, and get a single
156 status on that operation, we need a single status that represents all of those tasks.
157
158 The ImageUploadJob provides a single endpoint to control all the tasks and report
159 when all images are successfully upload or when any one fails.
160 """
161 STATES = ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
162 TIMEOUT_JOB = 6 * 60 * 60 # 6 hours
163 JOB_GEN = itertools.count(1)
164
165 def __init__(self, log, loop, upload_tasks, job_id=None, timeout_job=TIMEOUT_JOB):
166 self._log = log
167 self._loop = loop
168 self._upload_tasks = upload_tasks
169 self._job_id = next(ImageUploadJob.JOB_GEN) if job_id is None else job_id
170 self._timeout_job = timeout_job
171
172 self._state = "QUEUED"
173 self._state_stack = [self._state]
174
175 self._start_time = time.time()
176 self._stop_time = 0
177
178 self._task_future_map = {}
179 self._job_future = None
180
181 def __repr__(self):
182 return "{}(job_id={}, state={})".format(
183 self.__class__.__name__, self._job_id, self._state
184 )
185
186 @property
187 def id(self):
188 return self._job_id
189
190 @property
191 def state(self):
192 """ The state of the ImageUploadJob """
193 return self._state
194
195 @state.setter
196 def state(self, new_state):
197 """ Set the state of the ImageUploadJob """
198 states = ImageUploadJob.STATES
199 assert new_state in states
200 assert states.index(new_state) >= states.index(self._state)
201 self._state_stack.append(new_state)
202
203 self._state = new_state
204
205 @property
206 def state_stack(self):
207 """ The list of states that this job progressed through """
208 return self._state_stack
209
210 @property
211 def pb_msg(self):
212 """ The UploadJob protobuf message """
213 task = RwImageMgmtYang.UploadJob.from_dict({
214 "id": self._job_id,
215 "status": self._state,
216 "start_time": self._start_time,
217 "upload_tasks": [task.pb_msg for task in self._upload_tasks]
218 })
219
220 if self._stop_time:
221 task.stop_time = self._stop_time
222
223 return task
224
225 def _start_upload_tasks(self):
226 self._log.debug("Starting %s upload tasks", len(self._upload_tasks))
227
228 for upload_task in self._upload_tasks:
229 upload_task.start()
230
231 @asyncio.coroutine
232 def _wait_for_upload_tasks(self):
233 self._log.debug("Waiting for upload tasks to complete")
234
235 wait_coroutines = [t.wait() for t in self._upload_tasks]
236 if wait_coroutines:
237 yield from asyncio.wait(
238 wait_coroutines,
239 timeout=self._timeout_job,
240 loop=self._loop
241 )
242
243 self._log.debug("All upload tasks completed")
244
245 def _set_final_job_state(self):
246 failed_tasks = []
247 for task in self._upload_tasks:
248 if task.state != "COMPLETED":
249 failed_tasks.append(task)
250
251 if failed_tasks:
252 self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
253 self.state = "FAILED"
254 else:
255 self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
256 self.state = "COMPLETED"
257
258 @asyncio.coroutine
259 def _cancel_job(self):
260 for task in self._upload_tasks:
261 task.stop()
262
263 # TODO: Wait for all tasks to actually reach terminal
264 # states.
265
266 self.state = "CANCELLED"
267
268 @asyncio.coroutine
269 def _do_job(self):
270 self.state = "IN_PROGRESS"
271 self._start_upload_tasks()
272 try:
273 yield from self._wait_for_upload_tasks()
274 except asyncio.CancelledError:
275 self._log.debug("%s was cancelled. Cancelling all tasks.",
276 self)
277 self._loop.create_task(self._cancel_job())
278 raise
279 finally:
280 self._stop_time = time.time()
281 self._job_future = None
282
283 self._set_final_job_state()
284
285 @asyncio.coroutine
286 def wait(self):
287 """ Wait for the job to reach a terminal state """
288 if self._job_future is None:
289 raise UploadJobError("Job not started")
290
291 yield from asyncio.wait_for(
292 self._job_future,
293 self._timeout_job,
294 loop=self._loop
295 )
296
297 def start(self):
298 """ Start the job and all child tasks """
299 if self._state != "QUEUED":
300 raise UploadJobError("Job already started")
301
302 self._job_future = self._loop.create_task(self._do_job())
303 return self._job_future
304
305 def stop(self):
306 """ Stop the job and all child tasks """
307 if self._job_future is not None:
308 self.state = "CANCELLING"
309 self._job_future.cancel()
310
311
312 class ByteRateCalculator(object):
313 """ This class produces a byte rate from inputted measurements"""
314 def __init__(self, rate_time_constant):
315 self._rate = 0
316 self._time_constant = rate_time_constant
317
318 @property
319 def rate(self):
320 return self._rate
321
322 def add_measurement(self, num_bytes, time_delta):
323 rate = num_bytes / time_delta
324 if self._rate == 0:
325 self._rate = rate
326 else:
327 self._rate += ((rate - self._rate) / self._time_constant)
328
329 return self._rate
330
331
332 class UploadProgressWriteProxy(object):
333 """ This class implements a write proxy with produces various progress stats
334
335 In order to keep the complexity of the UploadTask down, this class acts as a
336 proxy for a file write. By providing the original handle to be written to
337 and having the client class call write() on this object, we can produce the
338 various statistics to be consumed.
339 """
340 RATE_TIME_CONSTANT = 5
341
342 def __init__(self, log, loop, bytes_total, write_hdl):
343 self._log = log
344 self._loop = loop
345 self._bytes_total = bytes_total
346 self._write_hdl = write_hdl
347
348 self._bytes_written = 0
349 self._byte_rate = 0
350
351 self._rate_calc = ByteRateCalculator(UploadProgressWriteProxy.RATE_TIME_CONSTANT)
352 self._rate_task = None
353
354 def write(self, data):
355 self._write_hdl.write(data)
356 self._bytes_written += len(data)
357
358 def close(self):
359 self._write_hdl.close()
360 if self._rate_task is not None:
361 self._log.debug("stopping rate monitoring task")
362 self._rate_task.cancel()
363
364 def start_rate_monitoring(self):
365 """ Start the rate monitoring task """
366 @asyncio.coroutine
367 def periodic_rate_task():
368 while True:
369 start_time = time.time()
370 start_bytes = self._bytes_written
371 yield from asyncio.sleep(1, loop=self._loop)
372 time_period = time.time() - start_time
373 num_bytes = self._bytes_written - start_bytes
374
375 self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
376
377 self._log.debug("starting rate monitoring task")
378 self._rate_task = self._loop.create_task(periodic_rate_task())
379
380 @property
381 def progress_percent(self):
382 if self._bytes_total == 0:
383 return 0
384
385 return int(self._bytes_written / self._bytes_total * 100)
386
387 @property
388 def bytes_written(self):
389 return self._bytes_written
390
391 @property
392 def bytes_total(self):
393 return self._bytes_total
394
395 @property
396 def bytes_rate(self):
397 return self._byte_rate
398
399
400 class GlanceImagePipeGen(object):
401 """ This class produces a read file handle from a generator that produces bytes
402
403 The CAL API takes a file handle as an input. The Glance API creates a generator
404 that produces byte strings. This class acts as the mediator by creating a pipe
405 and pumping the bytestring from the generator into the write side of the pipe.
406
407 A pipe has the useful feature here that it will block at the buffer size until
408 the reader has consumed. This allows us to only pull from glance and push at the
409 pace of the reader preventing us from having to store the images locally on disk.
410 """
411 def __init__(self, log, loop, data_gen):
412 self._log = log
413 self._loop = loop
414 self._data_gen = data_gen
415
416 read_fd, write_fd = os.pipe()
417
418 self._read_hdl = os.fdopen(read_fd, 'rb')
419 self._write_hdl = os.fdopen(write_fd, 'wb')
420 self._close_hdl = self._write_hdl
421
422 @property
423 def write_hdl(self):
424 return self._write_hdl
425
426 @write_hdl.setter
427 def write_hdl(self, new_write_hdl):
428 self._write_hdl = new_write_hdl
429
430 @property
431 def read_hdl(self):
432 return self._read_hdl
433
434 def _gen_writer(self):
435 self._log.debug("starting image data write to pipe")
436 try:
437 for data in self._data_gen:
438 try:
439 self._write_hdl.write(data)
440 except (BrokenPipeError, ValueError) as e:
441 self._log.warning("write pipe closed: %s", str(e))
442 return
443
444 except Exception as e:
445 self._log.exception("error when writing data to pipe: %s", str(e))
446
447 finally:
448 self._log.debug("closing write side of pipe")
449 try:
450 self._write_hdl.close()
451 except OSError:
452 pass
453
454 def start(self):
455 t = threading.Thread(target=self._gen_writer)
456 t.daemon = True
457 t.start()
458
459 def stop(self):
460 self._log.debug("stop requested, closing write side of pipe")
461 self._write_hdl.close()
462
463
464 class AccountImageUploadTask(object):
465 """ This class manages an create_image task from an image info and file handle
466
467 Manage the upload of a image to a configured cloud account.
468 """
469 STATES = ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
470
471 TIMEOUT_CHECK_EXISTS = 10
472 TIMEOUT_IMAGE_UPLOAD = 6 * 60 * 60 # 6 hours
473
474 def __init__(self, log, loop, account, image_info, image_hdl,
475 timeout_exists=TIMEOUT_CHECK_EXISTS, timeout_upload=TIMEOUT_IMAGE_UPLOAD,
476 progress_info=None, write_canceller=None
477 ):
478 self._log = log
479 self._loop = loop
480 self._account = account
481 self._image_info = image_info.deep_copy()
482 self._image_hdl = image_hdl
483
484 self._timeout_exists = timeout_exists
485 self._timeout_upload = timeout_upload
486
487 self._progress_info = progress_info
488 self._write_canceller = write_canceller
489
490 self._state = "QUEUED"
491 self._state_stack = [self._state]
492
493 self._detail = "Task is waiting to be started"
494 self._start_time = time.time()
495 self._stop_time = 0
496 self._upload_future = None
497
498 if not image_info.has_field("name"):
499 raise ValueError("image info must have name field")
500
501 @property
502 def state(self):
503 return self._state
504
505 @state.setter
506 def state(self, new_state):
507 states = AccountImageUploadTask.STATES
508 assert new_state in states
509 assert states.index(new_state) >= states.index(self._state)
510 self._state_stack.append(new_state)
511
512 self._state = new_state
513
514 @property
515 def state_stack(self):
516 return self._state_stack
517
518 @property
519 def image_id(self):
520 """ The image name being uploaded """
521 return self._image_info.id
522
523 @property
524 def image_name(self):
525 """ The image name being uploaded """
526 return self._image_info.name
527
528 @property
529 def image_checksum(self):
530 """ The image checksum being uploaded """
531 if self._image_info.has_field("checksum"):
532 return self._image_info.checksum
533
534 return None
535
536 @property
537 def cloud_account(self):
538 """ The cloud account name which the image is being uploaded to """
539 return self._account.name
540
541 @property
542 def pb_msg(self):
543 """ The UploadTask protobuf message """
544 task = RwImageMgmtYang.UploadTask.from_dict({
545 "cloud_account": self.cloud_account,
546 "image_id": self.image_id,
547 "image_name": self.image_name,
548 "status": self.state,
549 "detail": self._detail,
550 "start_time": self._start_time,
551 })
552
553 if self.image_checksum is not None:
554 task.image_checksum = self.image_checksum
555
556 if self._stop_time:
557 task.stop_time = self._stop_time
558
559 if self._progress_info:
560 task.bytes_written = self._progress_info.bytes_written
561 task.bytes_total = self._progress_info.bytes_total
562 task.progress_percent = self._progress_info.progress_percent
563 task.bytes_per_second = self._progress_info.bytes_rate
564
565 if self.state == "COMPLETED":
566 task.progress_percent = 100
567
568 return task
569
570 def _get_account_images(self):
571 account_images = []
572 self._log.debug("getting image list for account {}".format(self._account.name))
573 try:
574 account_images = self._account.get_image_list()
575 except rift.mano.cloud.CloudAccountCalError as e:
576 msg = "could not get image list for account {}".format(self._account.name)
577 self._log.error(msg)
578 raise ImageListError(msg) from e
579
580 return account_images
581
582 def _has_existing_image(self):
583 account = self._account
584
585 account_images = self._get_account_images()
586
587 matching_images = [i for i in account_images if i.name == self.image_name]
588
589 if self.image_checksum is not None:
590 matching_images = [i for i in matching_images if i.checksum == self.image_checksum]
591
592 if matching_images:
593 self._log.debug("found matching image with checksum in account %s",
594 account.name)
595 return True
596
597 self._log.debug("did not find matching image with checksum in account %s",
598 account.name)
599 return False
600
601 def _upload_image(self):
602 image = self._image_info
603 account = self._account
604
605 image.fileno = self._image_hdl.fileno()
606
607 self._log.debug("uploading to account {}: {}".format(account.name, image))
608 try:
609 image.id = account.create_image(image)
610 except rift.mano.cloud.CloudAccountCalError as e:
611 msg = "error when uploading image {} to cloud account: {}".format(image.name, str(e))
612 self._log.error(msg)
613 raise ImageUploadError(msg) from e
614
615 self._log.debug('uploaded image (id: {}) to account{}: {}'.format(
616 image.id, account.name, image.name))
617
618 return image.id
619
620 @asyncio.coroutine
621 def _do_upload(self):
622 try:
623 self.state = "CHECK_IMAGE_EXISTS"
624 has_image = yield from asyncio.wait_for(
625 self._loop.run_in_executor(None, self._has_existing_image),
626 timeout=self._timeout_exists,
627 loop=self._loop
628 )
629 if has_image:
630 self.state = "COMPLETED"
631 self._detail = "Image already exists on destination"
632 return
633
634 self.state = "UPLOADING"
635 self._detail = "Uploading image"
636
637 # Note that if the upload times out, the upload thread may still
638 # stick around. We'll need another method of cancelling the task
639 # through the VALA interface.
640 image_id = yield from asyncio.wait_for(
641 self._loop.run_in_executor(None, self._upload_image),
642 timeout=self._timeout_upload,
643 loop=self._loop
644 )
645
646 except asyncio.CancelledError as e:
647 self.state = "CANCELLED"
648 self._detail = "Image upload cancelled"
649
650 except ImageUploadTaskError as e:
651 self.state = "FAILED"
652 self._detail = str(e)
653
654 except asyncio.TimeoutError as e:
655 self.state = "FAILED"
656 self._detail = "Timed out during upload task: %s" % str(e)
657
658 else:
659 # If the user does not provide a checksum and performs a URL source
660 # upload with an incorrect URL, then Glance does not indicate a failure
661 # and the CAL cannot detect an incorrect upload. In this case, use
662 # the bytes_written to detect a bad upload and mark the task as failed.
663 if self._progress_info and self._progress_info.bytes_written == 0:
664 self.state = "FAILED"
665 self._detail = "No bytes written. Possible bad image source."
666 return
667
668 self.state = "COMPLETED"
669 self._detail = "Image successfully uploaded. Image id: %s" % image_id
670
671 finally:
672 self._stop_time = time.time()
673 self._upload_future = None
674
675 @asyncio.coroutine
676 def wait(self):
677 """ Wait for the upload task to complete """
678 if self._upload_future is None:
679 raise ImageUploadError("Task not started")
680
681 yield from asyncio.wait_for(
682 self._upload_future,
683 self._timeout_upload, loop=self._loop
684 )
685
686 def start(self):
687 """ Start the upload task """
688 if self._state != "QUEUED":
689 raise ImageUploadError("Task already started")
690
691 self._log.info("Starting %s", self)
692
693 self._upload_future = self._loop.create_task(self._do_upload())
694
695 return self._upload_future
696
697 def stop(self):
698 """ Stop the upload task in progress """
699 if self._upload_future is None:
700 self._log.warning("Cannot cancel %s. Not in progress.", self)
701 return
702
703 self.state = "CANCELLING"
704 self._detail = "Cancellation has been requested"
705
706 self._log.info("Cancelling %s", self)
707 self._upload_future.cancel()
708 if self._write_canceller is not None:
709 self._write_canceller.stop()