RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / upload.py
diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/upload.py
new file mode 100644 (file)
index 0000000..7ce74b2
--- /dev/null
@@ -0,0 +1,709 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import collections
+import itertools
+import os
+import time
+import threading
+
+import rift.mano.cloud
+
+import gi
+gi.require_version('RwImageMgmtYang', '1.0')
+from gi.repository import (
+        RwImageMgmtYang,
+        )
+
+
+class UploadJobError(Exception):
+    pass
+
+
+class ImageUploadTaskError(Exception):
+    pass
+
+
+class ImageUploadError(ImageUploadTaskError):
+    pass
+
+
+class ImageListError(ImageUploadTaskError):
+    pass
+
+
+class ImageUploadJobController(object):
+    """ This class starts and manages ImageUploadJobs """
+    MAX_COMPLETED_JOBS = 20
+
+    def __init__(self, log, loop, max_completed_jobs=MAX_COMPLETED_JOBS):
+        self._log = log
+        self._loop = loop
+        self._job_id_gen = itertools.count(1)
+        self._max_completed_jobs = max_completed_jobs
+
+        self._jobs = {}
+        self._completed_jobs = collections.deque(
+                maxlen=self._max_completed_jobs
+                )
+
+    @property
+    def pb_msg(self):
+        """ the UploadJobs protobuf message """
+        upload_jobs_msg = RwImageMgmtYang.UploadJobs()
+        for job in self._jobs.values():
+            upload_jobs_msg.job.append(job.pb_msg)
+
+        return upload_jobs_msg
+
+    @property
+    def jobs(self):
+        """ the tracked list of ImageUploadJobs """
+        return self._jobs.values()
+
+    @property
+    def completed_jobs(self):
+        """ completed jobs in the tracked list of ImageUploadJobs """
+        return [job for job in self._jobs.values() if job in self._completed_jobs]
+
+    @property
+    def active_jobs(self):
+        """ in-progress jobs in the tracked list of ImageUploadJobs """
+        return [job for job in self._jobs.values() if job not in self._completed_jobs]
+
+    def _add_job(self, job):
+        self._jobs[job.id] = job
+
+    def _start_job(self, job, on_completed=None):
+        def on_job_completed(_):
+            self._log.debug("%s completed.  Adding to completed jobs list.", job)
+
+            # If adding a new completed job is going to overflow the
+            # completed job list, find the first job that completed and
+            # remove it from the tracked jobs.
+            if len(self._completed_jobs) == self._completed_jobs.maxlen:
+                first_completed_job = self._completed_jobs[-1]
+                del self._jobs[first_completed_job.id]
+
+            self._completed_jobs.appendleft(job)
+
+        job_future = job.start()
+        job_future.add_done_callback(on_job_completed)
+
+        if on_completed is not None:
+            job_future.add_done_callback(on_completed)
+
+    def get_job(self, job_id):
+        """ Get the UploadJob from the job id
+
+        Arguments:
+            job_id - the job id that was previously added to the controller
+
+        Returns:
+            The associated ImageUploadJob
+
+        Raises:
+            LookupError - Could not find the job id
+        """
+        if job_id not in self._jobs:
+            raise LookupError("Could not find job_id %s" % job_id)
+
+        return self._jobs[job_id]
+
+    def create_job(self, image_tasks, on_completed=None):
+        """ Create and start a ImageUploadJob from a list of ImageUploadTasks
+
+        Arguments:
+            image_tasks - a list of ImageUploadTasks
+            on_completed - a callback which is added to the job future
+
+        Returns:
+            A ImageUploadJob id
+        """
+        self._log.debug("Creating new job from %s image tasks", len(image_tasks))
+        new_job = ImageUploadJob(
+                self._log,
+                self._loop,
+                image_tasks,
+                job_id=next(self._job_id_gen)
+                )
+
+        self._add_job(new_job)
+        self._start_job(new_job, on_completed=on_completed)
+
+        return new_job.id
+
+
+class ImageUploadJob(object):
+    """ This class manages a set of ImageUploadTasks
+
+    In order to push an image (or set of images) to many cloud accounts, and get a single
+    status on that operation, we need a single status that represents all of those tasks.
+
+    The ImageUploadJob provides a single endpoint to control all the tasks and report
+    when all images are successfully upload or when any one fails.
+    """
+    STATES = ("QUEUED", "IN_PROGRESS", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
+    TIMEOUT_JOB = 6 * 60 * 60  # 6 hours
+    JOB_GEN = itertools.count(1)
+
+    def __init__(self, log, loop, upload_tasks, job_id=None, timeout_job=TIMEOUT_JOB):
+        self._log = log
+        self._loop = loop
+        self._upload_tasks = upload_tasks
+        self._job_id = next(ImageUploadJob.JOB_GEN) if job_id is None else job_id
+        self._timeout_job = timeout_job
+
+        self._state = "QUEUED"
+        self._state_stack = [self._state]
+
+        self._start_time = time.time()
+        self._stop_time = 0
+
+        self._task_future_map = {}
+        self._job_future = None
+
+    def __repr__(self):
+        return "{}(job_id={}, state={})".format(
+                self.__class__.__name__, self._job_id, self._state
+                )
+
+    @property
+    def id(self):
+        return self._job_id
+
+    @property
+    def state(self):
+        """ The state of the ImageUploadJob """
+        return self._state
+
+    @state.setter
+    def state(self, new_state):
+        """ Set the state of the ImageUploadJob """
+        states = ImageUploadJob.STATES
+        assert new_state in states
+        assert states.index(new_state) >= states.index(self._state)
+        self._state_stack.append(new_state)
+
+        self._state = new_state
+
+    @property
+    def state_stack(self):
+        """ The list of states that this job progressed through  """
+        return self._state_stack
+
+    @property
+    def pb_msg(self):
+        """ The UploadJob protobuf message """
+        task = RwImageMgmtYang.UploadJob.from_dict({
+            "id": self._job_id,
+            "status": self._state,
+            "start_time": self._start_time,
+            "upload_tasks": [task.pb_msg for task in self._upload_tasks]
+        })
+
+        if self._stop_time:
+            task.stop_time = self._stop_time
+
+        return task
+
+    def _start_upload_tasks(self):
+        self._log.debug("Starting %s upload tasks", len(self._upload_tasks))
+
+        for upload_task in self._upload_tasks:
+            upload_task.start()
+
+    @asyncio.coroutine
+    def _wait_for_upload_tasks(self):
+        self._log.debug("Waiting for upload tasks to complete")
+
+        wait_coroutines = [t.wait() for t in self._upload_tasks]
+        if wait_coroutines:
+            yield from asyncio.wait(
+                    wait_coroutines,
+                    timeout=self._timeout_job,
+                    loop=self._loop
+                    )
+
+        self._log.debug("All upload tasks completed")
+
+    def _set_final_job_state(self):
+        failed_tasks = []
+        for task in self._upload_tasks:
+            if task.state != "COMPLETED":
+                failed_tasks.append(task)
+
+        if failed_tasks:
+            self._log.error("%s had %s FAILED tasks.", self, len(failed_tasks))
+            self.state = "FAILED"
+        else:
+            self._log.debug("%s tasks completed successfully", len(self._upload_tasks))
+            self.state = "COMPLETED"
+
+    @asyncio.coroutine
+    def _cancel_job(self):
+        for task in self._upload_tasks:
+            task.stop()
+
+        # TODO: Wait for all tasks to actually reach terminal
+        # states.
+
+        self.state = "CANCELLED"
+
+    @asyncio.coroutine
+    def _do_job(self):
+        self.state = "IN_PROGRESS"
+        self._start_upload_tasks()
+        try:
+            yield from self._wait_for_upload_tasks()
+        except asyncio.CancelledError:
+            self._log.debug("%s was cancelled.  Cancelling all tasks.",
+                            self)
+            self._loop.create_task(self._cancel_job())
+            raise
+        finally:
+            self._stop_time = time.time()
+            self._job_future = None
+
+        self._set_final_job_state()
+
+    @asyncio.coroutine
+    def wait(self):
+        """ Wait for the job to reach a terminal state """
+        if self._job_future is None:
+            raise UploadJobError("Job not started")
+
+        yield from asyncio.wait_for(
+                self._job_future,
+                self._timeout_job,
+                loop=self._loop
+                )
+
+    def start(self):
+        """ Start the job and all child tasks """
+        if self._state != "QUEUED":
+            raise UploadJobError("Job already started")
+
+        self._job_future = self._loop.create_task(self._do_job())
+        return self._job_future
+
+    def stop(self):
+        """ Stop the job and all child tasks  """
+        if self._job_future is not None:
+            self.state = "CANCELLING"
+            self._job_future.cancel()
+
+
+class ByteRateCalculator(object):
+    """  This class produces a byte rate from inputted measurements"""
+    def __init__(self, rate_time_constant):
+        self._rate = 0
+        self._time_constant = rate_time_constant
+
+    @property
+    def rate(self):
+        return self._rate
+
+    def add_measurement(self, num_bytes, time_delta):
+        rate = num_bytes / time_delta
+        if self._rate == 0:
+            self._rate = rate
+        else:
+            self._rate += ((rate - self._rate) / self._time_constant)
+
+        return self._rate
+
+
+class UploadProgressWriteProxy(object):
+    """ This class implements a write proxy with produces various progress stats
+
+    In order to keep the complexity of the UploadTask down, this class acts as a
+    proxy for a file write.  By providing the original handle to be written to
+    and having the client class call write() on this object, we can produce the
+    various statistics to be consumed.
+    """
+    RATE_TIME_CONSTANT = 5
+
+    def __init__(self, log, loop, bytes_total, write_hdl):
+        self._log = log
+        self._loop = loop
+        self._bytes_total = bytes_total
+        self._write_hdl = write_hdl
+
+        self._bytes_written = 0
+        self._byte_rate = 0
+
+        self._rate_calc = ByteRateCalculator(UploadProgressWriteProxy.RATE_TIME_CONSTANT)
+        self._rate_task = None
+
+    def write(self, data):
+        self._write_hdl.write(data)
+        self._bytes_written += len(data)
+
+    def close(self):
+        self._write_hdl.close()
+        if self._rate_task is not None:
+            self._log.debug("stopping rate monitoring task")
+            self._rate_task.cancel()
+
+    def start_rate_monitoring(self):
+        """ Start the rate monitoring task """
+        @asyncio.coroutine
+        def periodic_rate_task():
+            while True:
+                start_time = time.time()
+                start_bytes = self._bytes_written
+                yield from asyncio.sleep(1, loop=self._loop)
+                time_period = time.time() - start_time
+                num_bytes = self._bytes_written - start_bytes
+
+                self._byte_rate = self._rate_calc.add_measurement(num_bytes, time_period)
+
+        self._log.debug("starting rate monitoring task")
+        self._rate_task = self._loop.create_task(periodic_rate_task())
+
+    @property
+    def progress_percent(self):
+        if self._bytes_total == 0:
+            return 0
+
+        return int(self._bytes_written / self._bytes_total * 100)
+
+    @property
+    def bytes_written(self):
+        return self._bytes_written
+
+    @property
+    def bytes_total(self):
+        return self._bytes_total
+
+    @property
+    def bytes_rate(self):
+        return self._byte_rate
+
+
+class GlanceImagePipeGen(object):
+    """ This class produces a read file handle from a generator that produces bytes
+
+    The CAL API takes a file handle as an input.  The Glance API creates a generator
+    that produces byte strings.  This class acts as the mediator by creating a pipe
+    and pumping the bytestring from the generator into the write side of the pipe.
+
+    A pipe has the useful feature here that it will block at the buffer size until
+    the reader has consumed.  This allows us to only pull from glance and push at the
+    pace of the reader preventing us from having to store the images locally on disk.
+    """
+    def __init__(self, log, loop, data_gen):
+        self._log = log
+        self._loop = loop
+        self._data_gen = data_gen
+
+        read_fd, write_fd = os.pipe()
+
+        self._read_hdl = os.fdopen(read_fd, 'rb')
+        self._write_hdl = os.fdopen(write_fd, 'wb')
+        self._close_hdl = self._write_hdl
+
+    @property
+    def write_hdl(self):
+        return self._write_hdl
+
+    @write_hdl.setter
+    def write_hdl(self, new_write_hdl):
+        self._write_hdl = new_write_hdl
+
+    @property
+    def read_hdl(self):
+        return self._read_hdl
+
+    def _gen_writer(self):
+        self._log.debug("starting image data write to pipe")
+        try:
+            for data in self._data_gen:
+                try:
+                    self._write_hdl.write(data)
+                except (BrokenPipeError, ValueError) as e:
+                    self._log.warning("write pipe closed: %s", str(e))
+                    return
+
+        except Exception as e:
+            self._log.exception("error when writing data to pipe: %s", str(e))
+
+        finally:
+            self._log.debug("closing write side of pipe")
+            try:
+                self._write_hdl.close()
+            except OSError:
+                pass
+
+    def start(self):
+        t = threading.Thread(target=self._gen_writer)
+        t.daemon = True
+        t.start()
+
+    def stop(self):
+        self._log.debug("stop requested, closing write side of pipe")
+        self._write_hdl.close()
+
+
+class AccountImageUploadTask(object):
+    """ This class manages an create_image task from an image info and file handle
+
+    Manage the upload of a image to a configured cloud account.
+    """
+    STATES = ("QUEUED", "CHECK_IMAGE_EXISTS", "UPLOADING", "CANCELLING", "CANCELLED", "COMPLETED", "FAILED")
+
+    TIMEOUT_CHECK_EXISTS = 10
+    TIMEOUT_IMAGE_UPLOAD = 6 * 60 * 60  # 6 hours
+
+    def __init__(self, log, loop, account, image_info, image_hdl,
+                 timeout_exists=TIMEOUT_CHECK_EXISTS, timeout_upload=TIMEOUT_IMAGE_UPLOAD,
+                 progress_info=None, write_canceller=None
+                 ):
+        self._log = log
+        self._loop = loop
+        self._account = account
+        self._image_info = image_info.deep_copy()
+        self._image_hdl = image_hdl
+
+        self._timeout_exists = timeout_exists
+        self._timeout_upload = timeout_upload
+
+        self._progress_info = progress_info
+        self._write_canceller = write_canceller
+
+        self._state = "QUEUED"
+        self._state_stack = [self._state]
+
+        self._detail = "Task is waiting to be started"
+        self._start_time = time.time()
+        self._stop_time = 0
+        self._upload_future = None
+
+        if not image_info.has_field("name"):
+            raise ValueError("image info must have name field")
+
+    @property
+    def state(self):
+        return self._state
+
+    @state.setter
+    def state(self, new_state):
+        states = AccountImageUploadTask.STATES
+        assert new_state in states
+        assert states.index(new_state) >= states.index(self._state)
+        self._state_stack.append(new_state)
+
+        self._state = new_state
+
+    @property
+    def state_stack(self):
+        return self._state_stack
+
+    @property
+    def image_id(self):
+        """ The image name being uploaded """
+        return self._image_info.id
+
+    @property
+    def image_name(self):
+        """ The image name being uploaded """
+        return self._image_info.name
+
+    @property
+    def image_checksum(self):
+        """ The image checksum being uploaded """
+        if self._image_info.has_field("checksum"):
+            return self._image_info.checksum
+
+        return None
+
+    @property
+    def cloud_account(self):
+        """ The cloud account name which the image is being uploaded to """
+        return self._account.name
+
+    @property
+    def pb_msg(self):
+        """ The UploadTask protobuf message """
+        task = RwImageMgmtYang.UploadTask.from_dict({
+            "cloud_account": self.cloud_account,
+            "image_id": self.image_id,
+            "image_name": self.image_name,
+            "status": self.state,
+            "detail": self._detail,
+            "start_time": self._start_time,
+        })
+
+        if self.image_checksum is not None:
+            task.image_checksum = self.image_checksum
+
+        if self._stop_time:
+            task.stop_time = self._stop_time
+
+        if self._progress_info:
+            task.bytes_written = self._progress_info.bytes_written
+            task.bytes_total = self._progress_info.bytes_total
+            task.progress_percent = self._progress_info.progress_percent
+            task.bytes_per_second = self._progress_info.bytes_rate
+
+        if self.state == "COMPLETED":
+            task.progress_percent = 100
+
+        return task
+
+    def _get_account_images(self):
+        account_images = []
+        self._log.debug("getting image list for account {}".format(self._account.name))
+        try:
+            account_images = self._account.get_image_list()
+        except rift.mano.cloud.CloudAccountCalError as e:
+            msg = "could not get image list for account {}".format(self._account.name)
+            self._log.error(msg)
+            raise ImageListError(msg) from e
+
+        return account_images
+
+    def _has_existing_image(self):
+        account = self._account
+
+        account_images = self._get_account_images()
+
+        matching_images = [i for i in account_images if i.name == self.image_name]
+
+        if self.image_checksum is not None:
+            matching_images = [i for i in matching_images if i.checksum == self.image_checksum]
+
+        if matching_images:
+            self._log.debug("found matching image with checksum in account %s",
+                            account.name)
+            return True
+
+        self._log.debug("did not find matching image with checksum in account %s",
+                        account.name)
+        return False
+
+    def _upload_image(self):
+        image = self._image_info
+        account = self._account
+
+        image.fileno = self._image_hdl.fileno()
+
+        self._log.debug("uploading to account {}: {}".format(account.name, image))
+        try:
+            image.id = account.create_image(image)
+        except rift.mano.cloud.CloudAccountCalError as e:
+            msg = "error when uploading image {} to cloud account: {}".format(image.name, str(e))
+            self._log.error(msg)
+            raise ImageUploadError(msg) from e
+
+        self._log.debug('uploaded image (id: {}) to account{}: {}'.format(
+                        image.id, account.name, image.name))
+
+        return image.id
+
+    @asyncio.coroutine
+    def _do_upload(self):
+        try:
+            self.state = "CHECK_IMAGE_EXISTS"
+            has_image = yield from asyncio.wait_for(
+                    self._loop.run_in_executor(None, self._has_existing_image),
+                    timeout=self._timeout_exists,
+                    loop=self._loop
+                    )
+            if has_image:
+                self.state = "COMPLETED"
+                self._detail = "Image already exists on destination"
+                return
+
+            self.state = "UPLOADING"
+            self._detail = "Uploading image"
+
+            # Note that if the upload times out, the upload thread may still
+            # stick around.  We'll need another method of cancelling the task
+            # through the VALA interface.
+            image_id = yield from asyncio.wait_for(
+                    self._loop.run_in_executor(None, self._upload_image),
+                    timeout=self._timeout_upload,
+                    loop=self._loop
+                    )
+
+        except asyncio.CancelledError as e:
+            self.state = "CANCELLED"
+            self._detail = "Image upload cancelled"
+
+        except ImageUploadTaskError as e:
+            self.state = "FAILED"
+            self._detail = str(e)
+
+        except asyncio.TimeoutError as e:
+            self.state = "FAILED"
+            self._detail = "Timed out during upload task: %s" % str(e)
+
+        else:
+            # If the user does not provide a checksum and performs a URL source
+            # upload with an incorrect URL, then Glance does not indicate a failure
+            # and the CAL cannot detect an incorrect upload.  In this case, use
+            # the bytes_written to detect a bad upload and mark the task as failed.
+            if self._progress_info and self._progress_info.bytes_written == 0:
+                self.state = "FAILED"
+                self._detail = "No bytes written.  Possible bad image source."
+                return
+
+            self.state = "COMPLETED"
+            self._detail = "Image successfully uploaded.  Image id: %s" % image_id
+
+        finally:
+            self._stop_time = time.time()
+            self._upload_future = None
+
+    @asyncio.coroutine
+    def wait(self):
+        """ Wait for the upload task to complete """
+        if self._upload_future is None:
+            raise ImageUploadError("Task not started")
+
+        yield from asyncio.wait_for(
+                self._upload_future,
+                self._timeout_upload, loop=self._loop
+                )
+
+    def start(self):
+        """ Start the upload task """
+        if self._state != "QUEUED":
+            raise ImageUploadError("Task already started")
+
+        self._log.info("Starting %s", self)
+
+        self._upload_future = self._loop.create_task(self._do_upload())
+
+        return self._upload_future
+
+    def stop(self):
+        """ Stop the upload task in progress """
+        if self._upload_future is None:
+            self._log.warning("Cannot cancel %s.  Not in progress.", self)
+            return
+
+        self.state = "CANCELLING"
+        self._detail = "Cancellation has been requested"
+
+        self._log.info("Cancelling %s", self)
+        self._upload_future.cancel()
+        if self._write_canceller is not None:
+            self._write_canceller.stop()