RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / imagemgr / client.py
diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/imagemgr/client.py b/rwlaunchpad/plugins/rwimagemgr/rift/imagemgr/client.py
new file mode 100644 (file)
index 0000000..10df45b
--- /dev/null
@@ -0,0 +1,174 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import concurrent.futures
+
+import gi
+gi.require_version("RwImageMgmtYang", "1.0")
+from gi.repository import (
+    RwImageMgmtYang,
+)
+
+
+class UploadJobError(Exception):
+    pass
+
+
+class UploadJobFailed(UploadJobError):
+    pass
+
+
+class UploadJobCancelled(UploadJobFailed):
+    pass
+
+
+class UploadJobClient(object):
+    """ An upload job DTS client
+
+    This class wraps the DTS upload job actions to be more easily reused across
+    various components
+    """
+    def __init__(self, log, loop, dts):
+        self._log = log
+        self._loop = loop
+        self._dts = dts
+
+    def create_job(self, image_name, image_checksum, cloud_account_names=None):
+        """ Create an image upload_job and return an UploadJob instance
+
+        Arguments:
+            image_name - The name of the image in the image catalog
+            image_checksum - The checksum of the image in the catalog
+            cloud_account_names - Names of the cloud accounts to upload the image to.
+                                  None uploads the image to all cloud accounts.
+
+        Returns:
+            An UploadJob instance
+        """
+        create_job_msg = RwImageMgmtYang.CreateUploadJob.from_dict({
+            "onboarded_image": {
+                "image_name": image_name,
+                "image_checksum": image_checksum,
+                }
+            })
+
+        if cloud_account_names is not None:
+            create_job_msg.cloud_account = cloud_account_names
+
+        query_iter = yield from self._dts.query_rpc(
+                "I,/rw-image-mgmt:create-upload-job",
+                0,
+                create_job_msg,
+                )
+
+        for fut_resp in query_iter:
+            rpc_result = (yield from fut_resp).result
+
+            job_id = rpc_result.job_id
+
+        return UploadJob(self._log, self._loop, self._dts, job_id)
+
+    def create_job_threadsafe(self, image_name, image_checksum, cloud_account_names=None):
+        """ A thread-safe, syncronous wrapper for create_job """
+        future = concurrent.futures.Future()
+
+        def on_done(asyncio_future):
+            if asyncio_future.exception() is not None:
+                future.set_exception(asyncio_future.exception())
+
+            elif asyncio_future.result() is not None:
+                future.set_result(asyncio_future.result())
+
+        def add_task():
+            task = self._loop.create_task(
+                    self.create_job(image_name, image_checksum, cloud_account_names)
+                    )
+            task.add_done_callback(on_done)
+
+        self._loop.call_soon_threadsafe(add_task)
+        return future.result()
+
+
+class UploadJob(object):
+    """ A handle for a image upload job """
+    def __init__(self, log, loop, dts, job_id):
+        self._log = log
+        self._loop = loop
+        self._dts = dts
+        self._job_id = job_id
+
+    @asyncio.coroutine
+    def wait_until_complete(self):
+        """ Wait until the upload job reaches a terminal state
+
+        Raises:
+            UploadJobError: A generic exception occured in the upload job
+            UploadJobFailed: The upload job failed
+            UploadJobCancelled: The upload job was cancelled
+        """
+        self._log.debug("waiting for upload job %s to complete", self._job_id)
+        while True:
+            query_iter = yield from self._dts.query_read(
+                "D,/rw-image-mgmt:upload-jobs/rw-image-mgmt:job[rw-image-mgmt:id='{}']".format(
+                    self._job_id
+                )
+            )
+            job_status_msg = None
+            for fut_resp in query_iter:
+                job_status_msg = (yield from fut_resp).result
+                break
+
+            if job_status_msg is None:
+                raise UploadJobError("did not get a status response for job_id: %s",
+                                     self._job_id)
+
+            if job_status_msg.status == "COMPLETED":
+                msg = "upload job %s completed successfully" % self._job_id
+                self._log.debug(msg)
+                return
+
+            elif job_status_msg.status == "FAILED":
+                msg = "upload job %s as not successful: %s" % (self._job_id, job_status_msg.status)
+                self._log.error(msg)
+                raise UploadJobFailed(msg)
+
+            elif job_status_msg.status == "CANCELLED":
+                msg = "upload job %s was cancelled" % self._job_id
+                self._log.error(msg)
+                raise UploadJobCancelled(msg)
+
+            yield from asyncio.sleep(.5, loop=self._loop)
+
+    def wait_until_complete_threadsafe(self):
+        """ A thread-safe, synchronous wrapper for wait_until_complete """
+
+        future = concurrent.futures.Future()
+
+        def on_done(asyncio_future):
+            if asyncio_future.exception() is not None:
+                future.set_exception(asyncio_future.exception())
+                return
+
+            future.set_result(asyncio_future.result())
+
+        def add_task():
+            task = self._loop.create_task(self.wait_until_complete())
+            task.add_done_callback(on_done)
+
+        self._loop.call_soon_threadsafe(add_task)
+        return future.result()