New feature: Code changes for project support
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import collections
20 import itertools
21 import os
22 import time
23 import threading
24
25 import rift.mano.cloud
26
27 import gi
28 gi.require_version('RwImageMgmtYang', '1.0')
29 from gi.repository import (
30 RwImageMgmtYang,
31 )
32
33
34 class UploadJobError(Exception):
35 pass
36
37
38 class ImageUploadTaskError(Exception):
39 pass
40
41
42 class ImageUploadError(ImageUploadTaskError):
43 pass
44
45
46 class ImageListError(ImageUploadTaskError):
47 pass
48
49
50 class ImageUploadJobController(object):
51 """ This class starts and manages ImageUploadJobs """
52 MAX_COMPLETED_JOBS = 20
53
54 def __init__(self, log, loop, project, max_completed_jobs=MAX_COMPLETED_JOBS):
55 self._log = log
56 self._loop = loop
57 self._project = project
58 self._job_id_gen = itertools.count(1)
59 self._max_completed_jobs = max_completed_jobs
60
61 self._jobs = {}
62 self._completed_jobs = collections.deque(
63 maxlen=self._max_completed_jobs
64 )
65
66 @property
67 def pb_msg(self):
68 """ the UploadJobs protobuf message """
69 upload_jobs_msg = RwImageMgmtYang.UploadJobs()
70 for job in self._jobs.values():
71 upload_jobs_msg.job.append(job.pb_msg)
72
73 return upload_jobs_msg
74
75 @property
76 def jobs(self):
77 """ the tracked list of ImageUploadJobs """
78 return self._jobs.values()
79
80 @property
81 def completed_jobs(self):
82 """ completed jobs in the tracked list of ImageUploadJobs """
83 return [job for job in self._jobs.values() if job in self._completed_jobs]
84
85 @property
86 def active_jobs(self):
87 """ in-progress jobs in the tracked list of ImageUploadJobs """
88 return [job for job in self._jobs.values() if job not in self._completed_jobs]
89
90 def _add_job(self, job):
91 self._jobs[job.id] = job
92
93 def _start_job(self, job, on_completed=None):
94 def on_job_completed(_):
95 self._log.debug("%s completed. Adding to completed jobs list.", job)
96
97 # If adding a new completed job is going to overflow the
98 # completed job list, find the first job that completed and
99 # remove it from the tracked jobs.
100 if len(self._completed_jobs) == self._completed_jobs.maxlen:
101 first_completed_job = self._completed_jobs[-1]
102 del self._jobs[first_completed_job.id]
103
104 self._completed_jobs.appendleft(job)
105
106 job_future = job.start()
107 job_future.add_done_callback(on_job_completed)
108
109 if on_completed is not None:
110 job_future.add_done_callback(on_completed)
111
112 def get_job(self, job_id):
113 """ Get the UploadJob from the job id
114
115 Arguments:
116 job_id - the job id that was previously added to the controller
117
118 Returns:
119 The associated ImageUploadJob
120
121 Raises:
122 LookupError - Could not find the job id
123 """
124 if job_id not in self._jobs:
125 raise LookupError("Could not find job_id %s" % job_id)
126
127 return self._jobs[job_id]
128
129 def create_job(self, image_tasks, on_completed=None):
130 """ Create and start a ImageUploadJob from a list of ImageUploadTasks
131
132 Arguments:
133 image_tasks - a list of ImageUploadTasks
134 on_completed - a callback which is added to the job future
135
136 Returns:
137 A ImageUploadJob id
138 """
139 self._log.debug("Creating new job from %s image tasks", len(image_tasks))
140 new_job = ImageUploadJob(
141 self._log,
142 self._loop,
143 image_tasks,
144 job_id=next(self._job_id_gen)
145 )
146
147 self._add_job(new_job)
148 self._start_job(new_job, on_completed=on_completed)
149
150 return new_job.id
151
152
153 class ImageUploadJob(object):
154 """ This class manages a set of ImageUploadTasks
155
156 In order to push an image (or set of images) to many cloud accounts, and get a single
157 status on that operation, we need a single status that represents all of those tasks.
158
159 The ImageUploadJob provides a single endpoint to control all the tasks and report
160 when all images are successfully upload or when any one fails.
161 """
162 STATES = ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
163 TIMEOUT_JOB = 6 * 60 * 60 # 6 hours
164 JOB_GEN = itertools.count(1)
165
166 def __init__(self, log, loop, upload_tasks, job_id=None, timeout_job=TIMEOUT_JOB):
167 self._log = log
168 self._loop = loop
169 self._upload_tasks = upload_tasks
170 self._job_id = next(ImageUploadJob.JOB_GEN) if job_id is None else job_id
171 self._timeout_job = timeout_job
172
173 self._state = "QUEUED"
174 self._state_stack = [self._state]
175
176 self._start_time = time.time()
177 self._stop_time = 0
178
179 self._task_future_map = {}
180 self._job_future = None
181
182 def __repr__(self):
183 return "{}(job_id={}, state={})".format(
184 self.__class__.__name__, self._job_id, self._state
185 )
186
187 @property
188 def id(self):
189 return self._job_id
190
191 @property
192 def state(self):
193 """ The state of the ImageUploadJob """
194 return self._state
195
196 @state.setter
197 def state(self, new_state):
198 """ Set the state of the ImageUploadJob """
199 states = ImageUploadJob.STATES
200 assert new_state in states
201 assert states.index(new_state) >= states.index(self._state)
202 self._state_stack.append(new_state)
203
204 self._state = new_state
205
206 @property
207 def state_stack(self):
208 """ The list of states that this job progressed through """
209 return self._state_stack
210
211 @property
212 def pb_msg(self):
213 """ The UploadJob protobuf message """
214 task = RwImageMgmtYang.UploadJob.from_dict({
215 "id": self._job_id,
216 "status": self._state,
217 "start_time": self._start_time,
218 "upload_tasks": [task.pb_msg for task in self._upload_tasks]
219 })
220
221 if self._stop_time:
222 task.stop_time = self._stop_time
223
224 return task
225
226 def _start_upload_tasks(self):
227 self._log.debug("Starting %s upload tasks", len(self._upload_tasks))
228
229 for upload_task in self._upload_tasks:
230 upload_task.start()
231
232 @asyncio.coroutine
233 def _wait_for_upload_tasks(self):
234 self._log.debug("Waiting for upload tasks to complete")
235
236 wait_coroutines = [t.wait() for t in self._upload_tasks]
237 if wait_coroutines:
238 yield from asyncio.wait(
239 wait_coroutines,
240 timeout=self._timeout_job,
241 loop=self._loop
242 )
243
244 self._log.debug("All upload tasks completed")
245
246 def _set_final_job_state(self):
247 failed_tasks = []
248 for task in self._upload_tasks:
249 if task.state != "COMPLETED":
250 failed_tasks.append(task)
251
252 if failed_tasks:
253 self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
254 self.state = "FAILED"
255 else:
256 self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
257 self.state = "COMPLETED"
258
259 @asyncio.coroutine
260 def _cancel_job(self):
261 for task in self._upload_tasks:
262 task.stop()
263
264 # TODO: Wait for all tasks to actually reach terminal
265 # states.
266
267 self.state = "CANCELLED"
268
269 @asyncio.coroutine
270 def _do_job(self):
271 self.state = "IN_PROGRESS"
272 self._start_upload_tasks()
273 try:
274 yield from self._wait_for_upload_tasks()
275 except asyncio.CancelledError:
276 self._log.debug("%s was cancelled. Cancelling all tasks.",
277 self)
278 self._loop.create_task(self._cancel_job())
279 raise
280 finally:
281 self._stop_time = time.time()
282 self._job_future = None
283
284 self._set_final_job_state()
285
286 @asyncio.coroutine
287 def wait(self):
288 """ Wait for the job to reach a terminal state """
289 if self._job_future is None:
290 raise UploadJobError("Job not started")
291
292 yield from asyncio.wait_for(
293 self._job_future,
294 self._timeout_job,
295 loop=self._loop
296 )
297
298 def start(self):
299 """ Start the job and all child tasks """
300 if self._state != "QUEUED":
301 raise UploadJobError("Job already started")
302
303 self._job_future = self._loop.create_task(self._do_job())
304 return self._job_future
305
306 def stop(self):
307 """ Stop the job and all child tasks """
308 if self._job_future is not None:
309 self.state = "CANCELLING"
310 self._job_future.cancel()
311
312
313 class ByteRateCalculator(object):
314 """ This class produces a byte rate from inputted measurements"""
315 def __init__(self, rate_time_constant):
316 self._rate = 0
317 self._time_constant = rate_time_constant
318
319 @property
320 def rate(self):
321 return self._rate
322
323 def add_measurement(self, num_bytes, time_delta):
324 rate = num_bytes / time_delta
325 if self._rate == 0:
326 self._rate = rate
327 else:
328 self._rate += ((rate - self._rate) / self._time_constant)
329
330 return self._rate
331
332
333 class UploadProgressWriteProxy(object):
334 """ This class implements a write proxy with produces various progress stats
335
336 In order to keep the complexity of the UploadTask down, this class acts as a
337 proxy for a file write. By providing the original handle to be written to
338 and having the client class call write() on this object, we can produce the
339 various statistics to be consumed.
340 """
341 RATE_TIME_CONSTANT = 5
342
343 def __init__(self, log, loop, bytes_total, write_hdl):
344 self._log = log
345 self._loop = loop
346 self._bytes_total = bytes_total
347 self._write_hdl = write_hdl
348
349 self._bytes_written = 0
350 self._byte_rate = 0
351
352 self._rate_calc = ByteRateCalculator(UploadProgressWriteProxy.RATE_TIME_CONSTANT)
353 self._rate_task = None
354
355 def write(self, data):
356 self._write_hdl.write(data)
357 self._bytes_written += len(data)
358
359 def close(self):
360 self._write_hdl.close()
361 if self._rate_task is not None:
362 self._log.debug("stopping rate monitoring task")
363 self._rate_task.cancel()
364
365 def start_rate_monitoring(self):
366 """ Start the rate monitoring task """
367 @asyncio.coroutine
368 def periodic_rate_task():
369 while True:
370 start_time = time.time()
371 start_bytes = self._bytes_written
372 yield from asyncio.sleep(1, loop=self._loop)
373 time_period = time.time() - start_time
374 num_bytes = self._bytes_written - start_bytes
375
376 self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
377
378 self._log.debug("starting rate monitoring task")
379 self._rate_task = self._loop.create_task(periodic_rate_task())
380
381 @property
382 def progress_percent(self):
383 if self._bytes_total == 0:
384 return 0
385
386 return int(self._bytes_written / self._bytes_total * 100)
387
388 @property
389 def bytes_written(self):
390 return self._bytes_written
391
392 @property
393 def bytes_total(self):
394 return self._bytes_total
395
396 @property
397 def bytes_rate(self):
398 return self._byte_rate
399
400
401 class GlanceImagePipeGen(object):
402 """ This class produces a read file handle from a generator that produces bytes
403
404 The CAL API takes a file handle as an input. The Glance API creates a generator
405 that produces byte strings. This class acts as the mediator by creating a pipe
406 and pumping the bytestring from the generator into the write side of the pipe.
407
408 A pipe has the useful feature here that it will block at the buffer size until
409 the reader has consumed. This allows us to only pull from glance and push at the
410 pace of the reader preventing us from having to store the images locally on disk.
411 """
412 def __init__(self, log, loop, data_gen):
413 self._log = log
414 self._loop = loop
415 self._data_gen = data_gen
416
417 read_fd, write_fd = os.pipe()
418
419 self._read_hdl = os.fdopen(read_fd, 'rb')
420 self._write_hdl = os.fdopen(write_fd, 'wb')
421 self._close_hdl = self._write_hdl
422
423 @property
424 def write_hdl(self):
425 return self._write_hdl
426
427 @write_hdl.setter
428 def write_hdl(self, new_write_hdl):
429 self._write_hdl = new_write_hdl
430
431 @property
432 def read_hdl(self):
433 return self._read_hdl
434
435 def _gen_writer(self):
436 self._log.debug("starting image data write to pipe")
437 try:
438 for data in self._data_gen:
439 try:
440 self._write_hdl.write(data)
441 except (BrokenPipeError, ValueError) as e:
442 self._log.warning("write pipe closed: %s", str(e))
443 return
444
445 except Exception as e:
446 self._log.exception("error when writing data to pipe: %s", str(e))
447
448 finally:
449 self._log.debug("closing write side of pipe")
450 try:
451 self._write_hdl.close()
452 except OSError:
453 pass
454
455 def start(self):
456 t = threading.Thread(target=self._gen_writer)
457 t.daemon = True
458 t.start()
459
460 def stop(self):
461 self._log.debug("stop requested, closing write side of pipe")
462 self._write_hdl.close()
463
464
465 class AccountImageUploadTask(object):
466 """ This class manages an create_image task from an image info and file handle
467
468 Manage the upload of a image to a configured cloud account.
469 """
470 STATES = ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
471
472 TIMEOUT_CHECK_EXISTS = 10
473 TIMEOUT_IMAGE_UPLOAD = 6 * 60 * 60 # 6 hours
474
475 def __init__(self, log, loop, account, image_info, image_hdl,
476 timeout_exists=TIMEOUT_CHECK_EXISTS, timeout_upload=TIMEOUT_IMAGE_UPLOAD,
477 progress_info=None, write_canceller=None
478 ):
479 self._log = log
480 self._loop = loop
481 self._account = account
482 self._image_info = image_info.deep_copy()
483 self._image_hdl = image_hdl
484
485 self._timeout_exists = timeout_exists
486 self._timeout_upload = timeout_upload
487
488 self._progress_info = progress_info
489 self._write_canceller = write_canceller
490
491 self._state = "QUEUED"
492 self._state_stack = [self._state]
493
494 self._detail = "Task is waiting to be started"
495 self._start_time = time.time()
496 self._stop_time = 0
497 self._upload_future = None
498
499 if not image_info.has_field("name"):
500 raise ValueError("image info must have name field")
501
502 @property
503 def state(self):
504 return self._state
505
506 @state.setter
507 def state(self, new_state):
508 states = AccountImageUploadTask.STATES
509 assert new_state in states
510 assert states.index(new_state) >= states.index(self._state)
511 self._state_stack.append(new_state)
512
513 self._state = new_state
514
515 @property
516 def state_stack(self):
517 return self._state_stack
518
519 @property
520 def image_id(self):
521 """ The image name being uploaded """
522 return self._image_info.id
523
524 @property
525 def image_name(self):
526 """ The image name being uploaded """
527 return self._image_info.name
528
529 @property
530 def image_checksum(self):
531 """ The image checksum being uploaded """
532 if self._image_info.has_field("checksum"):
533 return self._image_info.checksum
534
535 return None
536
537 @property
538 def cloud_account(self):
539 """ The cloud account name which the image is being uploaded to """
540 return self._account.name
541
542 @property
543 def pb_msg(self):
544 """ The UploadTask protobuf message """
545 task = RwImageMgmtYang.UploadTask.from_dict({
546 "cloud_account": self.cloud_account,
547 "image_id": self.image_id,
548 "image_name": self.image_name,
549 "status": self.state,
550 "detail": self._detail,
551 "start_time": self._start_time,
552 })
553
554 if self.image_checksum is not None:
555 task.image_checksum = self.image_checksum
556
557 if self._stop_time:
558 task.stop_time = self._stop_time
559
560 if self._progress_info:
561 task.bytes_written = self._progress_info.bytes_written
562 task.bytes_total = self._progress_info.bytes_total
563 task.progress_percent = self._progress_info.progress_percent
564 task.bytes_per_second = self._progress_info.bytes_rate
565
566 if self.state == "COMPLETED":
567 task.progress_percent = 100
568
569 return task
570
571 def _get_account_images(self):
572 account_images = []
573 self._log.debug("getting image list for account {}".format(self._account.name))
574 try:
575 account_images = self._account.get_image_list()
576 except rift.mano.cloud.CloudAccountCalError as e:
577 msg = "could not get image list for account {}".format(self._account.name)
578 self._log.error(msg)
579 raise ImageListError(msg) from e
580
581 return account_images
582
583 def _has_existing_image(self):
584 account = self._account
585
586 account_images = self._get_account_images()
587
588 matching_images = [i for i in account_images if i.name == self.image_name]
589
590 if self.image_checksum is not None:
591 matching_images = [i for i in matching_images if i.checksum == self.image_checksum]
592
593 if matching_images:
594 self._log.debug("found matching image with checksum in account %s",
595 account.name)
596 return True
597
598 self._log.debug("did not find matching image with checksum in account %s",
599 account.name)
600 return False
601
602 def _upload_image(self):
603 image = self._image_info
604 account = self._account
605
606 image.fileno = self._image_hdl.fileno()
607
608 self._log.debug("uploading to account {}: {}".format(account.name, image))
609 try:
610 image.id = account.create_image(image)
611 except rift.mano.cloud.CloudAccountCalError as e:
612 msg = "error when uploading image {} to cloud account: {}".format(image.name, str(e))
613 self._log.error(msg)
614 raise ImageUploadError(msg) from e
615
616 self._log.debug('uploaded image (id: {}) to account{}: {}'.format(
617 image.id, account.name, image.name))
618
619 return image.id
620
621 @asyncio.coroutine
622 def _do_upload(self):
623 try:
624 self.state = "CHECK_IMAGE_EXISTS"
625 has_image = yield from asyncio.wait_for(
626 self._loop.run_in_executor(None, self._has_existing_image),
627 timeout=self._timeout_exists,
628 loop=self._loop
629 )
630 if has_image:
631 self.state = "COMPLETED"
632 self._detail = "Image already exists on destination"
633 return
634
635 self.state = "UPLOADING"
636 self._detail = "Uploading image"
637
638 # Note that if the upload times out, the upload thread may still
639 # stick around. We'll need another method of cancelling the task
640 # through the VALA interface.
641 image_id = yield from asyncio.wait_for(
642 self._loop.run_in_executor(None, self._upload_image),
643 timeout=self._timeout_upload,
644 loop=self._loop
645 )
646
647 except asyncio.CancelledError as e:
648 self.state = "CANCELLED"
649 self._detail = "Image upload cancelled"
650
651 except ImageUploadTaskError as e:
652 self.state = "FAILED"
653 self._detail = str(e)
654
655 except asyncio.TimeoutError as e:
656 self.state = "FAILED"
657 self._detail = "Timed out during upload task: %s" % str(e)
658
659 else:
660 # If the user does not provide a checksum and performs a URL source
661 # upload with an incorrect URL, then Glance does not indicate a failure
662 # and the CAL cannot detect an incorrect upload. In this case, use
663 # the bytes_written to detect a bad upload and mark the task as failed.
664 if self._progress_info and self._progress_info.bytes_written == 0:
665 self.state = "FAILED"
666 self._detail = "No bytes written. Possible bad image source."
667 return
668
669 self.state = "COMPLETED"
670 self._detail = "Image successfully uploaded. Image id: %s" % image_id
671
672 finally:
673 self._stop_time = time.time()
674 self._upload_future = None
675
676 @asyncio.coroutine
677 def wait(self):
678 """ Wait for the upload task to complete """
679 if self._upload_future is None:
680 raise ImageUploadError("Task not started")
681
682 yield from asyncio.wait_for(
683 self._upload_future,
684 self._timeout_upload, loop=self._loop
685 )
686
687 def start(self):
688 """ Start the upload task """
689 if self._state != "QUEUED":
690 raise ImageUploadError("Task already started")
691
692 self._log.info("Starting %s", self)
693
694 self._upload_future = self._loop.create_task(self._do_upload())
695
696 return self._upload_future
697
698 def stop(self):
699 """ Stop the upload task in progress """
700 if self._upload_future is None:
701 self._log.warning("Cannot cancel %s. Not in progress.", self)
702 return
703
704 self.state = "CANCELLING"
705 self._detail = "Cancellation has been requested"
706
707 self._log.info("Cancelling %s", self)
708 self._upload_future.cancel()
709 if self._write_canceller is not None:
710 self._write_canceller.stop()