update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import collections
20 import itertools
21 import os
22 import time
23 import threading
24
25 import rift.mano.cloud
26
27 import gi
28 gi.require_version('RwImageMgmtYang', '1.0')
29 from gi.repository import (
30 RwImageMgmtYang,
31 )
32
33
34 class UploadJobError(Exception):
35 pass
36
37
38 class ImageUploadTaskError(Exception):
39 pass
40
41
42 class ImageUploadError(ImageUploadTaskError):
43 pass
44
45
46 class ImageListError(ImageUploadTaskError):
47 pass
48
49
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS = 20
53
54 def __init__(self, project, max_completed_jobs=MAX_COMPLETED_JOBS):
55 self._log = project.log
56 self._loop = project.loop
57 self._project = project
58 self._job_id_gen = itertools.count(1)
59 self._max_completed_jobs = max_completed_jobs
60
61 self._jobs = {}
62 self._completed_jobs = collections.deque(
63 maxlen=self._max_completed_jobs
64 )
65
66 @property
67 def pb_msg(self):
68 """ the UploadJobs protobuf message """
69 upload_jobs_msg = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs()
70 for job in self._jobs.values():
71 upload_jobs_msg.job.append(job.pb_msg)
72
73 return upload_jobs_msg
74
75 @property
76 def jobs(self):
77 """ the tracked list of ImageUploadJobs """
78 return self._jobs.values()
79
80 @property
81 def completed_jobs(self):
82 """ completed jobs in the tracked list of ImageUploadJobs """
83 return [job for job in self._jobs.values() if job in self._completed_jobs]
84
85 @property
86 def active_jobs(self):
87 """ in-progress jobs in the tracked list of ImageUploadJobs """
88 return [job for job in self._jobs.values() if job not in self._completed_jobs]
89
90 def _add_job(self, job):
91 self._jobs[job.id] = job
92
93 def _start_job(self, job, on_completed=None):
94 def on_job_completed(_):
95 self._log.debug("%s completed. Adding to completed jobs list.", job)
96
97 # If adding a new completed job is going to overflow the
98 # completed job list, find the first job that completed and
99 # remove it from the tracked jobs.
100 if len(self._completed_jobs) == self._completed_jobs.maxlen:
101 first_completed_job = self._completed_jobs[-1]
102 del self._jobs[first_completed_job.id]
103
104 self._completed_jobs.appendleft(job)
105
106 job_future = job.start()
107 job_future.add_done_callback(on_job_completed)
108
109 if on_completed is not None:
110 job_future.add_done_callback(on_completed)
111
112 def get_job(self, job_id):
113 """ Get the UploadJob from the job id
114
115 Arguments:
116 job_id - the job id that was previously added to the controller
117
118 Returns:
119 The associated ImageUploadJob
120
121 Raises:
122 LookupError - Could not find the job id
123 """
124 if job_id not in self._jobs:
125 raise LookupError("Could not find job_id %s" % job_id)
126
127 return self._jobs[job_id]
128
129 def create_job(self, image_tasks, on_completed=None):
130 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
131
132 Arguments:
133 image_tasks - a list of ImageUploadTasks
134 on_completed - a callback which is added to the job future
135
136 Returns:
137 A ImageUploadJob id
138 """
139 self._log.debug("Creating new job from %s image tasks", len(image_tasks))
140 new_job = ImageUploadJob(
141 self._log,
142 self._loop,
143 image_tasks,
144 job_id=next(self._job_id_gen)
145 )
146
147 self._add_job(new_job)
148 self._start_job(new_job, on_completed=on_completed)
149
150 return new_job.id
151
152
153 class ImageUploadJob(object):
154 """ This class manages a set of ImageUploadTasks
155
156 In order to push an image (or set of images) to many cloud accounts, and get a single
157 status on that operation, we need a single status that represents all of those tasks.
158
159 The ImageUploadJob provides a single endpoint to control all the tasks and report
160 when all images are successfully upload or when any one fails.
161 """
162 STATES = ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
163 TIMEOUT_JOB = 6 * 60 * 60 # 6 hours
164 JOB_GEN = itertools.count(1)
165
166 def __init__(self, log, loop, upload_tasks, job_id=None, timeout_job=TIMEOUT_JOB):
167 self._log = log
168 self._loop = loop
169 self._upload_tasks = upload_tasks
170 self._job_id = next(ImageUploadJob.JOB_GEN) if job_id is None else job_id
171 self._timeout_job = timeout_job
172
173 self._state = "QUEUED"
174 self._state_stack = [self._state]
175
176 self._start_time = time.time()
177 self._stop_time = 0
178
179 self._task_future_map = {}
180 self._job_future = None
181
182 def __repr__(self):
183 return "{}(job_id={}, state={})".format(
184 self.__class__.__name__, self._job_id, self._state
185 )
186
187 @property
188 def id(self):
189 return self._job_id
190
191 @property
192 def state(self):
193 """ The state of the ImageUploadJob """
194 return self._state
195
196 @state.setter
197 def state(self, new_state):
198 """ Set the state of the ImageUploadJob """
199 states = ImageUploadJob.STATES
200 assert new_state in states
201 assert states.index(new_state) >= states.index(self._state)
202 self._state_stack.append(new_state)
203
204 self._state = new_state
205
206 @property
207 def state_stack(self):
208 """ The list of states that this job progressed through """
209 return self._state_stack
210
211 @property
212 def pb_msg(self):
213 """ The UploadJob protobuf message """
214 task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job.from_dict({
215 "id": self._job_id,
216 "status": self._state,
217 "start_time": self._start_time,
218 "upload_tasks": [task.pb_msg for task in self._upload_tasks]
219 })
220
221 if self._stop_time:
222 task.stop_time = self._stop_time
223
224 return task
225
226 def _start_upload_tasks(self):
227 self._log.debug("Starting %s upload tasks", len(self._upload_tasks))
228
229 for upload_task in self._upload_tasks:
230 upload_task.start()
231
232 @asyncio.coroutine
233 def _wait_for_upload_tasks(self):
234 self._log.debug("Waiting for upload tasks to complete")
235
236 wait_coroutines = [t.wait() for t in self._upload_tasks]
237 if wait_coroutines:
238 yield from asyncio.wait(
239 wait_coroutines,
240 timeout=self._timeout_job,
241 loop=self._loop
242 )
243
244 self._log.debug("All upload tasks completed")
245
246 def _set_final_job_state(self):
247 failed_tasks = []
248 for task in self._upload_tasks:
249 if task.state != "COMPLETED":
250 failed_tasks.append(task)
251
252 if failed_tasks:
253 self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
254 for ftask in failed_tasks:
255 self._log.error("%s : Failed to upload image : %s to cloud_account : %s", self, ftask.image_name, ftask.cloud_account)
256 self.state = "FAILED"
257 else:
258 self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
259 self.state = "COMPLETED"
260
261 @asyncio.coroutine
262 def _cancel_job(self):
263 for task in self._upload_tasks:
264 task.stop()
265
266 # TODO: Wait for all tasks to actually reach terminal
267 # states.
268
269 self.state = "CANCELLED"
270
271 @asyncio.coroutine
272 def _do_job(self):
273 self.state = "IN_PROGRESS"
274 self._start_upload_tasks()
275 try:
276 yield from self._wait_for_upload_tasks()
277 except asyncio.CancelledError:
278 self._log.debug("%s was cancelled. Cancelling all tasks.",
279 self)
280 self._loop.create_task(self._cancel_job())
281 raise
282 finally:
283 self._stop_time = time.time()
284 self._job_future = None
285
286 self._set_final_job_state()
287
288 @asyncio.coroutine
289 def wait(self):
290 """ Wait for the job to reach a terminal state """
291 if self._job_future is None:
292 raise UploadJobError("Job not started")
293
294 yield from asyncio.wait_for(
295 self._job_future,
296 self._timeout_job,
297 loop=self._loop
298 )
299
300 def start(self):
301 """ Start the job and all child tasks """
302 if self._state != "QUEUED":
303 raise UploadJobError("Job already started")
304
305 self._job_future = self._loop.create_task(self._do_job())
306 return self._job_future
307
308 def stop(self):
309 """ Stop the job and all child tasks """
310 if self._job_future is not None:
311 self.state = "CANCELLING"
312 self._job_future.cancel()
313
314
315 class ByteRateCalculator(object):
316 """ This class produces a byte rate from inputted measurements"""
317 def __init__(self, rate_time_constant):
318 self._rate = 0
319 self._time_constant = rate_time_constant
320
321 @property
322 def rate(self):
323 return self._rate
324
325 def add_measurement(self, num_bytes, time_delta):
326 rate = num_bytes / time_delta
327 if self._rate == 0:
328 self._rate = rate
329 else:
330 self._rate += ((rate - self._rate) / self._time_constant)
331
332 return self._rate
333
334
335 class UploadProgressWriteProxy(object):
336 """ This class implements a write proxy with produces various progress stats
337
338 In order to keep the complexity of the UploadTask down, this class acts as a
339 proxy for a file write. By providing the original handle to be written to
340 and having the client class call write() on this object, we can produce the
341 various statistics to be consumed.
342 """
343 RATE_TIME_CONSTANT = 5
344
345 def __init__(self, log, loop, bytes_total, write_hdl):
346 self._log = log
347 self._loop = loop
348 self._bytes_total = bytes_total
349 self._write_hdl = write_hdl
350
351 self._bytes_written = 0
352 self._byte_rate = 0
353
354 self._rate_calc = ByteRateCalculator(UploadProgressWriteProxy.RATE_TIME_CONSTANT)
355 self._rate_task = None
356
357 def write(self, data):
358 self._write_hdl.write(data)
359 self._bytes_written += len(data)
360
361 def close(self):
362 self._write_hdl.close()
363 if self._rate_task is not None:
364 self._log.debug("stopping rate monitoring task")
365 self._rate_task.cancel()
366
367 def start_rate_monitoring(self):
368 """ Start the rate monitoring task """
369 @asyncio.coroutine
370 def periodic_rate_task():
371 try:
372 while True:
373 start_time = time.time()
374 start_bytes = self._bytes_written
375 yield from asyncio.sleep(1, loop=self._loop)
376 time_period = time.time() - start_time
377 num_bytes = self._bytes_written - start_bytes
378
379 self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
380 except asyncio.CancelledError:
381 self._log.debug("rate monitoring task cancelled")
382
383 self._log.debug("starting rate monitoring task")
384 self._rate_task = self._loop.create_task(periodic_rate_task())
385
386 @property
387 def progress_percent(self):
388 if self._bytes_total == 0:
389 return 0
390
391 return int(self._bytes_written / self._bytes_total * 100)
392
393 @property
394 def bytes_written(self):
395 return self._bytes_written
396
397 @property
398 def bytes_total(self):
399 return self._bytes_total
400
401 @property
402 def bytes_rate(self):
403 return self._byte_rate
404
405
406 class GlanceImagePipeGen(object):
407 """ This class produces a read file handle from a generator that produces bytes
408
409 The CAL API takes a file handle as an input. The Glance API creates a generator
410 that produces byte strings. This class acts as the mediator by creating a pipe
411 and pumping the bytestring from the generator into the write side of the pipe.
412
413 A pipe has the useful feature here that it will block at the buffer size until
414 the reader has consumed. This allows us to only pull from glance and push at the
415 pace of the reader preventing us from having to store the images locally on disk.
416 """
417 def __init__(self, log, loop, data_gen):
418 self._log = log
419 self._loop = loop
420 self._data_gen = data_gen
421
422 read_fd, write_fd = os.pipe()
423
424 self._read_hdl = os.fdopen(read_fd, 'rb')
425 self._write_hdl = os.fdopen(write_fd, 'wb')
426 self._close_hdl = self._write_hdl
427
428 self._stop = False
429 self._t = None
430
431 @property
432 def write_hdl(self):
433 return self._write_hdl
434
435 @write_hdl.setter
436 def write_hdl(self, new_write_hdl):
437 self._write_hdl = new_write_hdl
438
439 @property
440 def read_hdl(self):
441 return self._read_hdl
442
443 def _gen_writer(self):
444 self._log.debug("starting image data write to pipe")
445 try:
446 for data in self._data_gen:
447 if self._stop:
448 break
449
450 try:
451 self._write_hdl.write(data)
452 except (BrokenPipeError, ValueError) as e:
453 self._log.warning("write pipe closed: %s", str(e))
454 return
455
456 except Exception as e:
457 self._log.exception("error when writing data to pipe: %s", str(e))
458
459 finally:
460 self._log.debug("closing write side of pipe")
461 try:
462 self._write_hdl.close()
463 except OSError:
464 pass
465
466 def start(self):
467 t = threading.Thread(target=self._gen_writer)
468 t.daemon = True
469 t.start()
470
471 self._t = t
472
473 def stop(self):
474 self._log.debug("stop requested, closing write side of pipe")
475 self._stop = True
476 if self._t is not None:
477 self._t.join(timeout=1)
478
479
480 class AccountImageUploadTask(object):
481 """ This class manages an create_image task from an image info and file handle
482
483 Manage the upload of a image to a configured cloud account.
484 """
485 STATES = ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
486
487 TIMEOUT_CHECK_EXISTS = 10
488 TIMEOUT_IMAGE_UPLOAD = 6 * 60 * 60 # 6 hours
489
490 def __init__(self, log, loop, account, image_info, image_hdl,
491 timeout_exists=TIMEOUT_CHECK_EXISTS, timeout_upload=TIMEOUT_IMAGE_UPLOAD,
492 progress_info=None, write_canceller=None
493 ):
494 self._log = log
495 self._loop = loop
496 self._account = account
497 self._image_info = image_info.deep_copy()
498 self._image_hdl = image_hdl
499
500 self._timeout_exists = timeout_exists
501 self._timeout_upload = timeout_upload
502
503 self._progress_info = progress_info
504 self._write_canceller = write_canceller
505
506 self._state = "QUEUED"
507 self._state_stack = [self._state]
508
509 self._detail = "Task is waiting to be started"
510 self._start_time = time.time()
511 self._stop_time = 0
512 self._upload_future = None
513
514 if not image_info.has_field("name"):
515 raise ValueError("image info must have name field")
516
517 @property
518 def state(self):
519 return self._state
520
521 @state.setter
522 def state(self, new_state):
523 states = AccountImageUploadTask.STATES
524 assert new_state in states
525 assert states.index(new_state) >= states.index(self._state)
526 self._state_stack.append(new_state)
527
528 self._state = new_state
529
530 @property
531 def state_stack(self):
532 return self._state_stack
533
534 @property
535 def image_id(self):
536 """ The image name being uploaded """
537 return self._image_info.id
538
539 @property
540 def image_name(self):
541 """ The image name being uploaded """
542 return self._image_info.name
543
544 @property
545 def image_checksum(self):
546 """ The image checksum being uploaded """
547 if self._image_info.has_field("checksum"):
548 return self._image_info.checksum
549
550 return None
551
552 @property
553 def cloud_account(self):
554 """ The cloud account name which the image is being uploaded to """
555 return self._account.name
556
557 @property
558 def pb_msg(self):
559 """ The UploadTask protobuf message """
560 task = RwImageMgmtYang.YangData_RwProject_Project_UploadJobs_Job_UploadTasks.from_dict({
561 "cloud_account": self.cloud_account,
562 "image_id": self.image_id,
563 "image_name": self.image_name,
564 "status": self.state,
565 "detail": self._detail,
566 "start_time": self._start_time,
567 })
568
569 if self.image_checksum is not None:
570 task.image_checksum = self.image_checksum
571
572 if self._stop_time:
573 task.stop_time = self._stop_time
574
575 if self._progress_info:
576 task.bytes_written = self._progress_info.bytes_written
577 task.bytes_total = self._progress_info.bytes_total
578 task.progress_percent = self._progress_info.progress_percent
579 task.bytes_per_second = self._progress_info.bytes_rate
580
581 if self.state == "COMPLETED":
582 task.progress_percent = 100
583
584 return task
585
586 def _get_account_images(self):
587 account_images = []
588 self._log.debug("getting image list for account {}".format(self._account.name))
589 try:
590 account_images = self._account.get_image_list()
591 except rift.mano.cloud.CloudAccountCalError as e:
592 msg = "could not get image list for account {}".format(self._account.name)
593 self._log.error(msg)
594 raise ImageListError(msg) from e
595
596 return account_images
597
598 def _has_existing_image(self):
599 account = self._account
600
601 account_images = self._get_account_images()
602
603 matching_images = [i for i in account_images if i.name == self.image_name]
604
605 if self.image_checksum is not None:
606 matching_images = [i for i in matching_images if i.checksum == self.image_checksum]
607
608 if matching_images:
609 self._log.debug("found matching image with checksum in account %s",
610 account.name)
611 return True
612
613 self._log.debug("did not find matching image with checksum in account %s",
614 account.name)
615 return False
616
617 def _upload_image(self):
618 image = self._image_info
619 account = self._account
620
621 image.fileno = self._image_hdl.fileno()
622
623 self._log.debug("uploading to account {}: {}".format(account.name, image))
624 try:
625 image.id = account.create_image(image)
626 except rift.mano.cloud.CloudAccountCalError as e:
627 msg = "error when uploading image {} to cloud account: {}".format(image.name, str(e))
628 self._log.error(msg)
629 raise ImageUploadError(msg) from e
630
631 self._log.debug('uploaded image (id: {}) to account{}: {}'.format(
632 image.id, account.name, image.name))
633
634 return image.id
635
636 @asyncio.coroutine
637 def _do_upload(self):
638 try:
639 self.state = "CHECK_IMAGE_EXISTS"
640 has_image = yield from asyncio.wait_for(
641 self._loop.run_in_executor(None, self._has_existing_image),
642 timeout=self._timeout_exists,
643 loop=self._loop
644 )
645 if has_image:
646 self.state = "COMPLETED"
647 self._detail = "Image already exists on destination"
648 return
649
650 self.state = "UPLOADING"
651 self._detail = "Uploading image"
652
653 # Note that if the upload times out, the upload thread may still
654 # stick around. We'll need another method of cancelling the task
655 # through the VALA interface.
656 image_id = yield from asyncio.wait_for(
657 self._loop.run_in_executor(None, self._upload_image),
658 timeout=self._timeout_upload,
659 loop=self._loop
660 )
661
662 except asyncio.CancelledError as e:
663 self.state = "CANCELLED"
664 self._detail = "Image upload cancelled"
665
666 except ImageUploadTaskError as e:
667 self.state = "FAILED"
668 self._detail = str(e)
669
670 except asyncio.TimeoutError as e:
671 self.state = "FAILED"
672 self._detail = "Timed out during upload task: %s" % str(e)
673
674 else:
675 # If the user does not provide a checksum and performs a URL source
676 # upload with an incorrect URL, then Glance does not indicate a failure
677 # and the CAL cannot detect an incorrect upload. In this case, use
678 # the bytes_written to detect a bad upload and mark the task as failed.
679 if self._progress_info and self._progress_info.bytes_written == 0:
680 self.state = "FAILED"
681 self._detail = "No bytes written. Possible bad image source."
682 return
683
684 self.state = "COMPLETED"
685 self._detail = "Image successfully uploaded. Image id: %s" % image_id
686
687 finally:
688 self._stop_time = time.time()
689 self._upload_future = None
690
691 @asyncio.coroutine
692 def wait(self):
693 """ Wait for the upload task to complete """
694 if self._upload_future is None:
695 raise ImageUploadError("Task not started")
696
697 yield from asyncio.wait_for(
698 self._upload_future,
699 self._timeout_upload, loop=self._loop
700 )
701
702 def start(self):
703 """ Start the upload task """
704 if self._state != "QUEUED":
705 raise ImageUploadError("Task already started")
706
707 self._log.info("Starting %s", self)
708
709 self._upload_future = self._loop.create_task(self._do_upload())
710
711 return self._upload_future
712
713 def stop(self):
714 """ Stop the upload task in progress """
715 if self._upload_future is None:
716 self._log.warning("Cannot cancel %s. Not in progress.", self)
717 return
718
719 self.state = "CANCELLING"
720 self._detail = "Cancellation has been requested"
721
722 self._log.info("Cancelling %s", self)
723 self._upload_future.cancel()
724 if self._write_canceller is not None:
725 self._write_canceller.stop()