RIFT OSM R1 Initial Submission
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py
new file mode 100644 (file)
index 0000000..027e582
--- /dev/null
@@ -0,0 +1,535 @@
+
+# 
+#   Copyright 2016 RIFT.IO Inc
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+
+import asyncio
+import os
+import threading
+import time
+
+import rift.tasklets
+import rift.mano.cloud
+
+from . import glance_proxy_server
+from . import glance_client
+from . import upload
+
+import gi
+gi.require_version('RwImageMgmtYang', '1.0')
+gi.require_version('RwLaunchpadYang', '1.0')
+gi.require_version('RwDts', '1.0')
+
+from gi.repository import (
+    RwcalYang,
+    RwDts as rwdts,
+    RwImageMgmtYang,
+    RwLaunchpadYang,
+)
+
+
+class ImageRequestError(Exception):
+    pass
+
+
+class AccountNotFoundError(ImageRequestError):
+    pass
+
+
+class ImageNotFoundError(ImageRequestError):
+    pass
+
+
+class CloudAccountDtsHandler(object):
+    def __init__(self, log, dts, log_hdl):
+        self._dts = dts
+        self._log = log
+        self._log_hdl = log_hdl
+        self._cloud_cfg_subscriber = None
+
+    def register(self, on_add_apply, on_delete_apply):
+        self._log.debug("creating cloud account config handler")
+        self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
+                self._dts, self._log, self._log_hdl,
+                rift.mano.cloud.CloudAccountConfigCallbacks(
+                    on_add_apply=on_add_apply,
+                    on_delete_apply=on_delete_apply,
+                    )
+                )
+        self._cloud_cfg_subscriber.register()
+
+
+def openstack_image_to_image_info(openstack_image):
+    """Convert the OpenstackImage to a ImageInfo protobuf message
+
+    Arguments:
+        openstack_image - A OpenstackImage instance
+
+    Returns:
+        A ImageInfo CAL protobuf message
+    """
+
+    image_info = RwcalYang.ImageInfoItem()
+
+    copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
+    for field in copy_fields:
+        value = getattr(openstack_image, field)
+        setattr(image_info, field, value)
+
+    image_info.state = openstack_image.status
+
+    return image_info
+
+
+class ImageDTSShowHandler(object):
+    """ A DTS publisher for the upload-jobs data container """
+    def __init__(self, log, loop, dts, job_controller):
+        self._log = log
+        self._loop = loop
+        self._dts = dts
+        self._job_controller = job_controller
+
+        self._subscriber = None
+
+    @asyncio.coroutine
+    def register(self):
+        """ Register as a publisher and wait for reg_ready to complete """
+        def get_xpath():
+            return "D,/rw-image-mgmt:upload-jobs"
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            if action != rwdts.QueryAction.READ:
+                xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+                return
+
+            jobs_pb_msg = self._job_controller.pb_msg
+
+            xact_info.respond_xpath(
+                    rwdts.XactRspCode.ACK,
+                    xpath=get_xpath(),
+                    msg=jobs_pb_msg,
+                    )
+
+        reg_event = asyncio.Event(loop=self._loop)
+
+        @asyncio.coroutine
+        def on_ready(regh, status):
+            reg_event.set()
+
+        self._subscriber = yield from self._dts.register(
+                xpath=get_xpath(),
+                handler=rift.tasklets.DTS.RegistrationHandler(
+                    on_prepare=on_prepare,
+                    on_ready=on_ready,
+                    ),
+                flags=rwdts.Flag.PUBLISHER,
+                )
+
+        yield from reg_event.wait()
+
+
+class ImageDTSRPCHandler(object):
+    """ A DTS publisher for the upload-job RPC's """
+    def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller):
+        self._log = log
+        self._loop = loop
+        self._dts = dts
+        self._accounts = accounts
+        self._glance_client = glance_client
+        self._upload_task_creator = upload_task_creator
+        self._job_controller = job_controller
+
+        self._subscriber = None
+
+    @asyncio.coroutine
+    def _register_create_upload_job(self):
+        def get_xpath():
+            return "/rw-image-mgmt:create-upload-job"
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            create_msg = msg
+
+            account_names = create_msg.cloud_account
+            # If cloud accounts were not specified, upload image to all cloud account
+            if not account_names:
+                account_names = list(self._accounts.keys())
+
+            for account_name in account_names:
+                if account_name not in self._accounts:
+                    raise AccountNotFoundError("Could not find account %s", account_name)
+
+            if create_msg.has_field("external_url"):
+                glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
+                        account_names, create_msg.external_url
+                        )
+
+                tasks = yield from self._upload_task_creator.create_tasks_from_glance_id(
+                    account_names, glance_image.id
+                    )
+
+                def delete_image(ft):
+                    try:
+                        self._glance_client.delete_image_from_id(glance_image.id)
+                    except glance_client.OpenstackImageDeleteError:
+                        pass
+
+                # Create a job and when the job completes delete the temporary
+                # image from the catalog.
+                job_id = self._job_controller.create_job(
+                        tasks,
+                        on_completed=delete_image
+                        )
+
+            elif create_msg.has_field("onboarded_image"):
+                tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
+                    account_names, create_msg.onboarded_image
+                    )
+                job_id = self._job_controller.create_job(tasks)
+
+            else:
+                raise ImageRequestError("an image selection must be provided")
+
+            rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id)
+
+            xact_info.respond_xpath(
+                    rwdts.XactRspCode.ACK,
+                    xpath="O," + get_xpath(),
+                    msg=rpc_out_msg,
+                    )
+
+        reg_event = asyncio.Event(loop=self._loop)
+
+        @asyncio.coroutine
+        def on_ready(_, status):
+            reg_event.set()
+
+        self._subscriber = yield from self._dts.register(
+                xpath="I," + get_xpath(),
+                handler=rift.tasklets.DTS.RegistrationHandler(
+                    on_prepare=on_prepare,
+                    on_ready=on_ready,
+                    ),
+                flags=rwdts.Flag.PUBLISHER,
+                )
+
+        yield from reg_event.wait()
+
+    @asyncio.coroutine
+    def _register_cancel_upload_job(self):
+        def get_xpath():
+            return "/rw-image-mgmt:cancel-upload-job"
+
+        @asyncio.coroutine
+        def on_prepare(xact_info, action, ks_path, msg):
+            if not msg.has_field("job_id"):
+                self._log.error("cancel-upload-job missing job-id field.")
+                xact_info.respond_xpath(rwdts.XactRspCode.NACK)
+                return
+
+            job_id = msg.job_id
+
+            job = self._job_controller.get_job(job_id)
+            job.stop()
+
+            xact_info.respond_xpath(
+                    rwdts.XactRspCode.ACK,
+                    xpath="O," + get_xpath(),
+                    )
+
+        reg_event = asyncio.Event(loop=self._loop)
+
+        @asyncio.coroutine
+        def on_ready(_, status):
+            reg_event.set()
+
+        self._subscriber = yield from self._dts.register(
+                xpath="I," + get_xpath(),
+                handler=rift.tasklets.DTS.RegistrationHandler(
+                    on_prepare=on_prepare,
+                    on_ready=on_ready,
+                    ),
+                flags=rwdts.Flag.PUBLISHER,
+                )
+
+        yield from reg_event.wait()
+
+    @asyncio.coroutine
+    def register(self):
+        """ Register for RPC's and wait for all registrations to complete """
+        yield from self._register_create_upload_job()
+        yield from self._register_cancel_upload_job()
+
+
+class GlanceClientUploadTaskCreator(object):
+    """ This class creates upload tasks using configured cloud accounts and
+    configured image catalog glance client """
+
+    def __init__(self, log, loop, accounts, glance_client):
+        self._log = log
+        self._loop = loop
+        self._accounts = accounts
+        self._glance_client = glance_client
+
+    @asyncio.coroutine
+    def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
+        """ Create a list of UploadTasks for a list of cloud accounts
+        and a image with a matching image_name and image_checksum in the
+        catalog
+
+        Arguments:
+            account_names - A list of configured cloud account names
+            image_id - A image id
+            image_name - A image name
+            image_checksum - A image checksum
+
+        Returns:
+            A list of AccountImageUploadTask instances
+
+        Raises:
+            ImageNotFoundError - Could not find a matching image in the
+                image catalog
+
+            AccountNotFoundError - Could not find an account that matched
+                the provided account name
+        """
+        try:
+            image = yield from asyncio.wait_for(
+                    self._loop.run_in_executor(
+                            None,
+                            self._glance_client.find_active_image,
+                            image_id,
+                            image_name,
+                            image_checksum,
+                            ),
+                    timeout=5,
+                    loop=self._loop,
+                    )
+
+        except glance_client.OpenstackImageError as e:
+            msg = "Could not find image in Openstack to upload"
+            self._log.exception(msg)
+            raise ImageNotFoundError(msg) from e
+
+        image_info = openstack_image_to_image_info(image)
+        self._log.debug("created image info: %s", image_info)
+
+        tasks = []
+        for account_name in account_names:
+            if account_name not in self._accounts:
+                raise AccountNotFoundError("Could not find account %s", account_name)
+
+        # For each account name provided, create a pipe (GlanceImagePipeGen)
+        # which feeds data into the UploadTask while also monitoring the various
+        # transmit stats (progress, bytes written, bytes per second, etc)
+        for account_name in account_names:
+            account = self._accounts[account_name]
+            self._log.debug("creating task for account %s", account.name)
+            glance_data_gen = self._glance_client.get_image_data(image_info.id)
+
+            pipe_gen = upload.GlanceImagePipeGen(self._log, self._loop, glance_data_gen)
+            progress_pipe = upload.UploadProgressWriteProxy(
+                    self._log, self._loop, image.size, pipe_gen.write_hdl
+                    )
+            progress_pipe.start_rate_monitoring()
+            pipe_gen.write_hdl = progress_pipe
+            pipe_gen.start()
+
+            task = upload.AccountImageUploadTask(
+                    self._log, self._loop, account, image_info, pipe_gen.read_hdl,
+                    progress_info=progress_pipe, write_canceller=pipe_gen,
+                    )
+            tasks.append(task)
+            self._log.debug("task created: %s", task)
+
+        return tasks
+
+    @asyncio.coroutine
+    def create_glance_image_from_url_create_rpc(self, account_names, create_msg):
+        if "image_url" not in create_msg:
+            raise ValueError("image_url must be specified")
+
+        if "image_id" in create_msg:
+            raise ImageRequestError("Cannot specify both image_url and image_id")
+
+        if "image_name" not in create_msg:
+            raise ImageRequestError("image_name must be specified when image_url is provided")
+
+        glance_image = yield from asyncio.wait_for(
+                self._loop.run_in_executor(
+                    None,
+                    self._glance_client.create_image_from_url,
+                    create_msg.image_url,
+                    create_msg.image_name,
+                    create_msg.image_checksum if "image_checksum" in create_msg else None,
+                    create_msg.disk_format if "disk_format" in create_msg else None,
+                    create_msg.container_format if "container_format" in create_msg else None,
+                    ),
+                timeout=5,
+                loop=self._loop,
+                )
+
+        return glance_image
+
+    @asyncio.coroutine
+    def create_tasks_from_glance_id(self, account_names, glance_image_id):
+        return (yield from self.create_tasks(account_names, glance_image_id))
+
+    @asyncio.coroutine
+    def create_tasks_from_onboarded_create_rpc(self, account_names, create_msg):
+        return (yield from self.create_tasks(
+            account_names,
+            create_msg.image_id if "image_id" in create_msg else None,
+            create_msg.image_name if "image_name" in create_msg else None,
+            create_msg.image_checksum if "image_checksum" in create_msg else None)
+            )
+
+
+class ImageManagerTasklet(rift.tasklets.Tasklet):
+    """
+    The RwImageMgrTasklet provides a interface for DTS to interact with an
+    instance of the Monitor class. This allows the Monitor class to remain
+    independent of DTS.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.rwlog.set_category("rw-mano-log")
+
+        self.cloud_cfg_subscriber = None
+        self.http_proxy = None
+        self.proxy_server = None
+        self.dts = None
+        self.job_controller = None
+        self.cloud_accounts = {}
+        self.glance_client = None
+        self.task_creator = None
+        self.rpc_handler = None
+        self.show_handler = None
+
+    def start(self):
+        super().start()
+        self.log.info("Starting Image Manager Tasklet")
+
+        self.log.debug("Registering with dts")
+        self.dts = rift.tasklets.DTS(
+                self.tasklet_info,
+                RwImageMgmtYang.get_schema(),
+                self.loop,
+                self.on_dts_state_change
+                )
+
+        self.log.debug("Created DTS Api GI Object: %s", self.dts)
+
+    def stop(self):
+        try:
+            self.dts.deinit()
+        except Exception as e:
+            self.log.exception(e)
+
+    @asyncio.coroutine
+    def init(self):
+        try:
+            self.log.debug("creating cloud account handler")
+            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl)
+            self.cloud_cfg_subscriber.register(
+                    self.on_cloud_account_create,
+                    self.on_cloud_account_delete
+                    )
+
+            self.log.debug("creating http proxy server")
+
+            self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
+
+            self.proxy_server = glance_proxy_server.GlanceHTTPProxyServer(
+                    self.log, self.loop, self.http_proxy
+                    )
+            self.proxy_server.start()
+
+            self.job_controller = upload.ImageUploadJobController(
+                    self.log, self.loop
+                    )
+
+            self.glance_client = glance_client.OpenstackGlanceClient.from_token(
+                    self.log, "127.0.0.1", "9292", "test"
+                    )
+
+            self.task_creator = GlanceClientUploadTaskCreator(
+                    self.log, self.loop, self.cloud_accounts, self.glance_client
+                    )
+
+            self.rpc_handler = ImageDTSRPCHandler(
+                    self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator,
+                    self.job_controller
+                    )
+            yield from self.rpc_handler.register()
+
+            self.show_handler = ImageDTSShowHandler(
+                    self.log, self.loop, self.dts, self.job_controller
+                    )
+            yield from self.show_handler.register()
+
+        except Exception as e:
+            self.log.exception("error during init")
+
+    def on_cloud_account_create(self, account):
+        self.log.debug("adding cloud account: %s", account.name)
+        self.cloud_accounts[account.name] = account
+
+    def on_cloud_account_delete(self, account_name):
+        self.log.debug("deleting cloud account: %s", account_name)
+        if account_name not in self.cloud_accounts:
+            self.log.warning("cloud account not found: %s", account_name)
+
+        del self.cloud_accounts[account_name]
+
+    @asyncio.coroutine
+    def run(self):
+        pass
+
+    def on_instance_started(self):
+        self.log.debug("Got instance started callback")
+
+    @asyncio.coroutine
+    def on_dts_state_change(self, state):
+        """Handle DTS state change
+
+        Take action according to current DTS state to transition application
+        into the corresponding application state
+
+        Arguments
+            state - current dts state
+
+        """
+        switch = {
+            rwdts.State.INIT: rwdts.State.REGN_COMPLETE,
+            rwdts.State.CONFIG: rwdts.State.RUN,
+        }
+
+        handlers = {
+            rwdts.State.INIT: self.init,
+            rwdts.State.RUN: self.run,
+        }
+
+        # Transition application to next state
+        handler = handlers.get(state, None)
+        if handler is not None:
+            yield from handler()
+
+        # Transition dts to next state
+        next_state = switch.get(state, None)
+        if next_state is not None:
+            self.dts.handle.set_state(next_state)