c1716d3cd321b053924017d3faed985e85b91343
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import collections
20 import itertools
21 import os
22 import time
23 import threading
24
25 import rift.mano.cloud
26
27 import gi
28 gi.require_version('RwImageMgmtYang', '1.0')
29 from gi.repository import (
30 RwImageMgmtYang,
31 )
32
33
34 class UploadJobError(Exception):
35 pass
36
37
38 class ImageUploadTaskError(Exception):
39 pass
40
41
42 class ImageUploadError(ImageUploadTaskError):
43 pass
44
45
46 class ImageListError(ImageUploadTaskError):
47 pass
48
49
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS = 20
53
54 def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS):
55 self._log = log
56 self._loop = loop
57 self._job_id_gen = itertools.count(1)
58 self._max_completed_jobs = max_completed_jobs
59
60 self._jobs = {}
61 self._completed_jobs = collections.deque(
62 maxlen=self._max_completed_jobs
63 )
64
65 @property
66 def pb_msg(self):
67 """ the UploadJobs protobuf message """
68 upload_jobs_msg = RwImageMgmtYang.UploadJobs()
69 for job in self._jobs.values():
70 upload_jobs_msg.job.append(job.pb_msg)
71
72 return upload_jobs_msg
73
74 @property
75 def jobs(self):
76 """ the tracked list of ImageUploadJobs """
77 return self._jobs.values()
78
79 @property
80 def completed_jobs(self):
81 """ completed jobs in the tracked list of ImageUploadJobs """
82 return [job for job in self._jobs.values() if job in self._completed_jobs]
83
84 @property
85 def active_jobs(self):
86 """ in-progress jobs in the tracked list of ImageUploadJobs """
87 return [job for job in self._jobs.values() if job not in self._completed_jobs]
88
89 def _add_job(self, job):
90 self._jobs[job.id] = job
91
92 def _start_job(self, job, on_completed=None):
93 def on_job_completed(_):
94 self._log.debug("%s completed. Adding to completed jobs list.", job)
95
96 # If adding a new completed job is going to overflow the
97 # completed job list, find the first job that completed and
98 # remove it from the tracked jobs.
99 if len(self._completed_jobs) == self._completed_jobs.maxlen:
100 first_completed_job = self._completed_jobs[-1]
101 del self._jobs[first_completed_job.id]
102
103 self._completed_jobs.appendleft(job)
104
105 job_future = job.start()
106 job_future.add_done_callback(on_job_completed)
107
108 if on_completed is not None:
109 job_future.add_done_callback(on_completed)
110
111 def get_job(self, job_id):
112 """ Get the UploadJob from the job id
113
114 Arguments:
115 job_id - the job id that was previously added to the controller
116
117 Returns:
118 The associated ImageUploadJob
119
120 Raises:
121 LookupError - Could not find the job id
122 """
123 if job_id not in self._jobs:
124 raise LookupError("Could not find job_id %s" % job_id)
125
126 return self._jobs[job_id]
127
128 def create_job(self, image_tasks, on_completed=None):
129 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
130
131 Arguments:
132 image_tasks - a list of ImageUploadTasks
133 on_completed - a callback which is added to the job future
134
135 Returns:
136 A ImageUploadJob id
137 """
138 self._log.debug("Creating new job from %s image tasks", len(image_tasks))
139 new_job = ImageUploadJob(
140 self._log,
141 self._loop,
142 image_tasks,
143 job_id=next(self._job_id_gen)
144 )
145
146 self._add_job(new_job)
147 self._start_job(new_job, on_completed=on_completed)
148
149 return new_job.id
150
151
152 class ImageUploadJob(object):
153 """ This class manages a set of ImageUploadTasks
154
155 In order to push an image (or set of images) to many cloud accounts, and get a single
156 status on that operation, we need a single status that represents all of those tasks.
157
158 The ImageUploadJob provides a single endpoint to control all the tasks and report
159 when all images are successfully upload or when any one fails.
160 """
161 STATES = ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
162 TIMEOUT_JOB = 6 * 60 * 60 # 6 hours
163 JOB_GEN = itertools.count(1)
164
165 def __init__(self, log, loop, upload_tasks, job_id=None, timeout_job=TIMEOUT_JOB):
166 self._log = log
167 self._loop = loop
168 self._upload_tasks = upload_tasks
169 self._job_id = next(ImageUploadJob.JOB_GEN) if job_id is None else job_id
170 self._timeout_job = timeout_job
171
172 self._state = "QUEUED"
173 self._state_stack = [self._state]
174
175 self._start_time = time.time()
176 self._stop_time = 0
177
178 self._task_future_map = {}
179 self._job_future = None
180
181 def __repr__(self):
182 return "{}(job_id={}, state={})".format(
183 self.__class__.__name__, self._job_id, self._state
184 )
185
186 @property
187 def id(self):
188 return self._job_id
189
190 @property
191 def state(self):
192 """ The state of the ImageUploadJob """
193 return self._state
194
195 @state.setter
196 def state(self, new_state):
197 """ Set the state of the ImageUploadJob """
198 states = ImageUploadJob.STATES
199 assert new_state in states
200 assert states.index(new_state) >= states.index(self._state)
201 self._state_stack.append(new_state)
202
203 self._state = new_state
204
205 @property
206 def state_stack(self):
207 """ The list of states that this job progressed through """
208 return self._state_stack
209
210 @property
211 def pb_msg(self):
212 """ The UploadJob protobuf message """
213 task = RwImageMgmtYang.UploadJob.from_dict({
214 "id": self._job_id,
215 "status": self._state,
216 "start_time": self._start_time,
217 "upload_tasks": [task.pb_msg for task in self._upload_tasks]
218 })
219
220 if self._stop_time:
221 task.stop_time = self._stop_time
222
223 return task
224
225 def _start_upload_tasks(self):
226 self._log.debug("Starting %s upload tasks", len(self._upload_tasks))
227
228 for upload_task in self._upload_tasks:
229 upload_task.start()
230
231 @asyncio.coroutine
232 def _wait_for_upload_tasks(self):
233 self._log.debug("Waiting for upload tasks to complete")
234
235 wait_coroutines = [t.wait() for t in self._upload_tasks]
236 if wait_coroutines:
237 yield from asyncio.wait(
238 wait_coroutines,
239 timeout=self._timeout_job,
240 loop=self._loop
241 )
242
243 self._log.debug("All upload tasks completed")
244
245 def _set_final_job_state(self):
246 failed_tasks = []
247 for task in self._upload_tasks:
248 if task.state != "COMPLETED":
249 failed_tasks.append(task)
250
251 if failed_tasks:
252 self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
253 for ftask in failed_tasks:
254 self._log.error("%s : Failed to upload image : %s to cloud_account : %s", self, ftask.image_name, ftask.cloud_account)
255 self.state = "FAILED"
256 else:
257 self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
258 self.state = "COMPLETED"
259
260 @asyncio.coroutine
261 def _cancel_job(self):
262 for task in self._upload_tasks:
263 task.stop()
264
265 # TODO: Wait for all tasks to actually reach terminal
266 # states.
267
268 self.state = "CANCELLED"
269
270 @asyncio.coroutine
271 def _do_job(self):
272 self.state = "IN_PROGRESS"
273 self._start_upload_tasks()
274 try:
275 yield from self._wait_for_upload_tasks()
276 except asyncio.CancelledError:
277 self._log.debug("%s was cancelled. Cancelling all tasks.",
278 self)
279 self._loop.create_task(self._cancel_job())
280 raise
281 finally:
282 self._stop_time = time.time()
283 self._job_future = None
284
285 self._set_final_job_state()
286
287 @asyncio.coroutine
288 def wait(self):
289 """ Wait for the job to reach a terminal state """
290 if self._job_future is None:
291 raise UploadJobError("Job not started")
292
293 yield from asyncio.wait_for(
294 self._job_future,
295 self._timeout_job,
296 loop=self._loop
297 )
298
299 def start(self):
300 """ Start the job and all child tasks """
301 if self._state != "QUEUED":
302 raise UploadJobError("Job already started")
303
304 self._job_future = self._loop.create_task(self._do_job())
305 return self._job_future
306
307 def stop(self):
308 """ Stop the job and all child tasks """
309 if self._job_future is not None:
310 self.state = "CANCELLING"
311 self._job_future.cancel()
312
313
314 class ByteRateCalculator(object):
315 """ This class produces a byte rate from inputted measurements"""
316 def __init__(self, rate_time_constant):
317 self._rate = 0
318 self._time_constant = rate_time_constant
319
320 @property
321 def rate(self):
322 return self._rate
323
324 def add_measurement(self, num_bytes, time_delta):
325 rate = num_bytes / time_delta
326 if self._rate == 0:
327 self._rate = rate
328 else:
329 self._rate += ((rate - self._rate) / self._time_constant)
330
331 return self._rate
332
333
334 class UploadProgressWriteProxy(object):
335 """ This class implements a write proxy with produces various progress stats
336
337 In order to keep the complexity of the UploadTask down, this class acts as a
338 proxy for a file write. By providing the original handle to be written to
339 and having the client class call write() on this object, we can produce the
340 various statistics to be consumed.
341 """
342 RATE_TIME_CONSTANT = 5
343
344 def __init__(self, log, loop, bytes_total, write_hdl):
345 self._log = log
346 self._loop = loop
347 self._bytes_total = bytes_total
348 self._write_hdl = write_hdl
349
350 self._bytes_written = 0
351 self._byte_rate = 0
352
353 self._rate_calc = ByteRateCalculator(UploadProgressWriteProxy.RATE_TIME_CONSTANT)
354 self._rate_task = None
355
356 def write(self, data):
357 self._write_hdl.write(data)
358 self._bytes_written += len(data)
359
360 def close(self):
361 self._write_hdl.close()
362 if self._rate_task is not None:
363 self._log.debug("stopping rate monitoring task")
364 self._rate_task.cancel()
365
366 def start_rate_monitoring(self):
367 """ Start the rate monitoring task """
368 @asyncio.coroutine
369 def periodic_rate_task():
370 while True:
371 start_time = time.time()
372 start_bytes = self._bytes_written
373 yield from asyncio.sleep(1, loop=self._loop)
374 time_period = time.time() - start_time
375 num_bytes = self._bytes_written - start_bytes
376
377 self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
378
379 self._log.debug("starting rate monitoring task")
380 self._rate_task = self._loop.create_task(periodic_rate_task())
381
382 @property
383 def progress_percent(self):
384 if self._bytes_total == 0:
385 return 0
386
387 return int(self._bytes_written / self._bytes_total * 100)
388
389 @property
390 def bytes_written(self):
391 return self._bytes_written
392
393 @property
394 def bytes_total(self):
395 return self._bytes_total
396
397 @property
398 def bytes_rate(self):
399 return self._byte_rate
400
401
402 class GlanceImagePipeGen(object):
403 """ This class produces a read file handle from a generator that produces bytes
404
405 The CAL API takes a file handle as an input. The Glance API creates a generator
406 that produces byte strings. This class acts as the mediator by creating a pipe
407 and pumping the bytestring from the generator into the write side of the pipe.
408
409 A pipe has the useful feature here that it will block at the buffer size until
410 the reader has consumed. This allows us to only pull from glance and push at the
411 pace of the reader preventing us from having to store the images locally on disk.
412 """
413 def __init__(self, log, loop, data_gen):
414 self._log = log
415 self._loop = loop
416 self._data_gen = data_gen
417
418 read_fd, write_fd = os.pipe()
419
420 self._read_hdl = os.fdopen(read_fd, 'rb')
421 self._write_hdl = os.fdopen(write_fd, 'wb')
422 self._close_hdl = self._write_hdl
423
424 @property
425 def write_hdl(self):
426 return self._write_hdl
427
428 @write_hdl.setter
429 def write_hdl(self, new_write_hdl):
430 self._write_hdl = new_write_hdl
431
432 @property
433 def read_hdl(self):
434 return self._read_hdl
435
436 def _gen_writer(self):
437 self._log.debug("starting image data write to pipe")
438 try:
439 for data in self._data_gen:
440 try:
441 self._write_hdl.write(data)
442 except (BrokenPipeError, ValueError) as e:
443 self._log.warning("write pipe closed: %s", str(e))
444 return
445
446 except Exception as e:
447 self._log.exception("error when writing data to pipe: %s", str(e))
448
449 finally:
450 self._log.debug("closing write side of pipe")
451 try:
452 self._write_hdl.close()
453 except OSError:
454 pass
455
456 def start(self):
457 t = threading.Thread(target=self._gen_writer)
458 t.daemon = True
459 t.start()
460
461 def stop(self):
462 self._log.debug("stop requested, closing write side of pipe")
463 self._write_hdl.close()
464
465
466 class AccountImageUploadTask(object):
467 """ This class manages an create_image task from an image info and file handle
468
469 Manage the upload of a image to a configured cloud account.
470 """
471 STATES = ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
472
473 TIMEOUT_CHECK_EXISTS = 10
474 TIMEOUT_IMAGE_UPLOAD = 6 * 60 * 60 # 6 hours
475
476 def __init__(self, log, loop, account, image_info, image_hdl,
477 timeout_exists=TIMEOUT_CHECK_EXISTS, timeout_upload=TIMEOUT_IMAGE_UPLOAD,
478 progress_info=None, write_canceller=None
479 ):
480 self._log = log
481 self._loop = loop
482 self._account = account
483 self._image_info = image_info.deep_copy()
484 self._image_hdl = image_hdl
485
486 self._timeout_exists = timeout_exists
487 self._timeout_upload = timeout_upload
488
489 self._progress_info = progress_info
490 self._write_canceller = write_canceller
491
492 self._state = "QUEUED"
493 self._state_stack = [self._state]
494
495 self._detail = "Task is waiting to be started"
496 self._start_time = time.time()
497 self._stop_time = 0
498 self._upload_future = None
499
500 if not image_info.has_field("name"):
501 raise ValueError("image info must have name field")
502
503 @property
504 def state(self):
505 return self._state
506
507 @state.setter
508 def state(self, new_state):
509 states = AccountImageUploadTask.STATES
510 assert new_state in states
511 assert states.index(new_state) >= states.index(self._state)
512 self._state_stack.append(new_state)
513
514 self._state = new_state
515
516 @property
517 def state_stack(self):
518 return self._state_stack
519
520 @property
521 def image_id(self):
522 """ The image name being uploaded """
523 return self._image_info.id
524
525 @property
526 def image_name(self):
527 """ The image name being uploaded """
528 return self._image_info.name
529
530 @property
531 def image_checksum(self):
532 """ The image checksum being uploaded """
533 if self._image_info.has_field("checksum"):
534 return self._image_info.checksum
535
536 return None
537
538 @property
539 def cloud_account(self):
540 """ The cloud account name which the image is being uploaded to """
541 return self._account.name
542
543 @property
544 def pb_msg(self):
545 """ The UploadTask protobuf message """
546 task = RwImageMgmtYang.UploadTask.from_dict({
547 "cloud_account": self.cloud_account,
548 "image_id": self.image_id,
549 "image_name": self.image_name,
550 "status": self.state,
551 "detail": self._detail,
552 "start_time": self._start_time,
553 })
554
555 if self.image_checksum is not None:
556 task.image_checksum = self.image_checksum
557
558 if self._stop_time:
559 task.stop_time = self._stop_time
560
561 if self._progress_info:
562 task.bytes_written = self._progress_info.bytes_written
563 task.bytes_total = self._progress_info.bytes_total
564 task.progress_percent = self._progress_info.progress_percent
565 task.bytes_per_second = self._progress_info.bytes_rate
566
567 if self.state == "COMPLETED":
568 task.progress_percent = 100
569
570 return task
571
572 def _get_account_images(self):
573 account_images = []
574 self._log.debug("getting image list for account {}".format(self._account.name))
575 try:
576 account_images = self._account.get_image_list()
577 except rift.mano.cloud.CloudAccountCalError as e:
578 msg = "could not get image list for account {}".format(self._account.name)
579 self._log.error(msg)
580 raise ImageListError(msg) from e
581
582 return account_images
583
584 def _has_existing_image(self):
585 account = self._account
586
587 account_images = self._get_account_images()
588
589 matching_images = [i for i in account_images if i.name == self.image_name]
590
591 if self.image_checksum is not None:
592 matching_images = [i for i in matching_images if i.checksum == self.image_checksum]
593
594 if matching_images:
595 self._log.debug("found matching image with checksum in account %s",
596 account.name)
597 return True
598
599 self._log.debug("did not find matching image with checksum in account %s",
600 account.name)
601 return False
602
603 def _upload_image(self):
604 image = self._image_info
605 account = self._account
606
607 image.fileno = self._image_hdl.fileno()
608
609 self._log.debug("uploading to account {}: {}".format(account.name, image))
610 try:
611 image.id = account.create_image(image)
612 except rift.mano.cloud.CloudAccountCalError as e:
613 msg = "error when uploading image {} to cloud account: {}".format(image.name, str(e))
614 self._log.error(msg)
615 raise ImageUploadError(msg) from e
616
617 self._log.debug('uploaded image (id: {}) to account{}: {}'.format(
618 image.id, account.name, image.name))
619
620 return image.id
621
622 @asyncio.coroutine
623 def _do_upload(self):
624 try:
625 self.state = "CHECK_IMAGE_EXISTS"
626 has_image = yield from asyncio.wait_for(
627 self._loop.run_in_executor(None, self._has_existing_image),
628 timeout=self._timeout_exists,
629 loop=self._loop
630 )
631 if has_image:
632 self.state = "COMPLETED"
633 self._detail = "Image already exists on destination"
634 return
635
636 self.state = "UPLOADING"
637 self._detail = "Uploading image"
638
639 # Note that if the upload times out, the upload thread may still
640 # stick around. We'll need another method of cancelling the task
641 # through the VALA interface.
642 image_id = yield from asyncio.wait_for(
643 self._loop.run_in_executor(None, self._upload_image),
644 timeout=self._timeout_upload,
645 loop=self._loop
646 )
647
648 except asyncio.CancelledError as e:
649 self.state = "CANCELLED"
650 self._detail = "Image upload cancelled"
651
652 except ImageUploadTaskError as e:
653 self.state = "FAILED"
654 self._detail = str(e)
655
656 except asyncio.TimeoutError as e:
657 self.state = "FAILED"
658 self._detail = "Timed out during upload task: %s" % str(e)
659
660 else:
661 # If the user does not provide a checksum and performs a URL source
662 # upload with an incorrect URL, then Glance does not indicate a failure
663 # and the CAL cannot detect an incorrect upload. In this case, use
664 # the bytes_written to detect a bad upload and mark the task as failed.
665 if self._progress_info and self._progress_info.bytes_written == 0:
666 self.state = "FAILED"
667 self._detail = "No bytes written. Possible bad image source."
668 return
669
670 self.state = "COMPLETED"
671 self._detail = "Image successfully uploaded. Image id: %s" % image_id
672
673 finally:
674 self._stop_time = time.time()
675 self._upload_future = None
676
677 @asyncio.coroutine
678 def wait(self):
679 """ Wait for the upload task to complete """
680 if self._upload_future is None:
681 raise ImageUploadError("Task not started")
682
683 yield from asyncio.wait_for(
684 self._upload_future,
685 self._timeout_upload, loop=self._loop
686 )
687
688 def start(self):
689 """ Start the upload task """
690 if self._state != "QUEUED":
691 raise ImageUploadError("Task already started")
692
693 self._log.info("Starting %s", self)
694
695 self._upload_future = self._loop.create_task(self._do_upload())
696
697 return self._upload_future
698
699 def stop(self):
700 """ Stop the upload task in progress """
701 if self._upload_future is None:
702 self._log.warning("Cannot cancel %s. Not in progress.", self)
703 return
704
705 self.state = "CANCELLING"
706 self._detail = "Cancellation has been requested"
707
708 self._log.info("Cancelling %s", self)
709 self._upload_future.cancel()
710 if self._write_canceller is not None:
711 self._write_canceller.stop()