update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
index 027e582..9ea9cbc 100644 (file)
@@ -22,6 +22,13 @@ import time
 
 import rift.tasklets
 import rift.mano.cloud
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectConfigCallbacks,
+    ProjectHandler,
+    get_add_delete_update_cfgs,
+    DEFAULT_PROJECT,
+    )
 
 from . import glance_proxy_server
 from . import glance_client
@@ -53,22 +60,30 @@ class ImageNotFoundError(ImageRequestError):
 
 
 class CloudAccountDtsHandler(object):
-    def __init__(self, log, dts, log_hdl):
+    def __init__(self, log, dts, log_hdl, project):
         self._dts = dts
         self._log = log
         self._log_hdl = log_hdl
         self._cloud_cfg_subscriber = None
+        self._project = project
 
+    @asyncio.coroutine
     def register(self, on_add_apply, on_delete_apply):
-        self._log.debug("creating cloud account config handler")
+        self._log.debug("Project {}: creating cloud account config handler".
+                        format(self._project.name))
         self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
-                self._dts, self._log, self._log_hdl,
+                self._dts, self._log, self._log_hdl, self._project,
                 rift.mano.cloud.CloudAccountConfigCallbacks(
                     on_add_apply=on_add_apply,
                     on_delete_apply=on_delete_apply,
                     )
                 )
-        self._cloud_cfg_subscriber.register()
+        yield from self._cloud_cfg_subscriber.register()
+
+    def deregister(self):
+        self._log.debug("Project {}: Removing cloud account config handler".
+                        format(self._project.name))
+        self._cloud_cfg_subscriber.deregister()
 
 
 def openstack_image_to_image_info(openstack_image):
@@ -81,13 +96,19 @@ def openstack_image_to_image_info(openstack_image):
         A ImageInfo CAL protobuf message
     """
 
-    image_info = RwcalYang.ImageInfoItem()
+    image_info = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList()
 
     copy_fields = ["id", "name", "checksum", "container_format", "disk_format"]
     for field in copy_fields:
         value = getattr(openstack_image, field)
         setattr(image_info, field, value)
 
+    value = getattr(openstack_image, "properties")
+    for key in value:
+        prop = image_info.properties.add()
+        prop.name = key
+        prop.property_value = value[key]
+
     image_info.state = openstack_image.status
 
     return image_info
@@ -95,19 +116,21 @@ def openstack_image_to_image_info(openstack_image):
 
 class ImageDTSShowHandler(object):
     """ A DTS publisher for the upload-jobs data container """
-    def __init__(self, log, loop, dts, job_controller):
-        self._log = log
-        self._loop = loop
-        self._dts = dts
+    def __init__(self, project, job_controller):
+        self._log = project.log
+        self._loop = project.loop
+        self._dts = project.dts
         self._job_controller = job_controller
+        self._project = project
 
         self._subscriber = None
 
+    def get_xpath(self):
+        return self._project.add_project("D,/rw-image-mgmt:upload-jobs")
+
     @asyncio.coroutine
     def register(self):
         """ Register as a publisher and wait for reg_ready to complete """
-        def get_xpath():
-            return "D,/rw-image-mgmt:upload-jobs"
 
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
@@ -119,7 +142,7 @@ class ImageDTSShowHandler(object):
 
             xact_info.respond_xpath(
                     rwdts.XactRspCode.ACK,
-                    xpath=get_xpath(),
+                    xpath=self.get_xpath(),
                     msg=jobs_pb_msg,
                     )
 
@@ -130,7 +153,7 @@ class ImageDTSShowHandler(object):
             reg_event.set()
 
         self._subscriber = yield from self._dts.register(
-                xpath=get_xpath(),
+                xpath=self.get_xpath(),
                 handler=rift.tasklets.DTS.RegistrationHandler(
                     on_prepare=on_prepare,
                     on_ready=on_ready,
@@ -141,18 +164,31 @@ class ImageDTSShowHandler(object):
         yield from reg_event.wait()
 
 
+    def deregister(self):
+        self._log.debug("Project {}: De-register show image handler".
+                        format(self._project.name))
+        if self._subscriber:
+            self._subscriber.delete_element(self.get_xpath())
+            self._subscriber.deregister()
+            self._subscriber = None
+
 class ImageDTSRPCHandler(object):
     """ A DTS publisher for the upload-job RPC's """
-    def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller):
-        self._log = log
-        self._loop = loop
-        self._dts = dts
-        self._accounts = accounts
+    def __init__(self, project, glance_client, upload_task_creator, job_controller):
+        self._log = project.log
+        self._loop = project.loop
+        self._dts = project.dts
         self._glance_client = glance_client
         self._upload_task_creator = upload_task_creator
         self._job_controller = job_controller
+        self._project = project
 
-        self._subscriber = None
+        self._create = None
+        self._cancel = None
+
+    @property
+    def accounts(self):
+        return self._project.cloud_accounts
 
     @asyncio.coroutine
     def _register_create_upload_job(self):
@@ -164,13 +200,20 @@ class ImageDTSRPCHandler(object):
             create_msg = msg
 
             account_names = create_msg.cloud_account
+
+            self._log.debug("Create upload job  msg: {} ".format(msg.as_dict()))
+
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             # If cloud accounts were not specified, upload image to all cloud account
             if not account_names:
-                account_names = list(self._accounts.keys())
+                account_names = list(self.accounts.keys())
 
-            for account_name in account_names:
-                if account_name not in self._accounts:
-                    raise AccountNotFoundError("Could not find account %s", account_name)
+            else:
+                for account_name in account_names:
+                    if account_name not in self.accounts:
+                        raise AccountNotFoundError("Could not find account %s", account_name)
 
             if create_msg.has_field("external_url"):
                 glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc(
@@ -195,6 +238,8 @@ class ImageDTSRPCHandler(object):
                         )
 
             elif create_msg.has_field("onboarded_image"):
+                self._log.debug("onboarded_image {} to accounts {}".
+                                format(create_msg.onboarded_image, account_names))
                 tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc(
                     account_names, create_msg.onboarded_image
                     )
@@ -203,7 +248,7 @@ class ImageDTSRPCHandler(object):
             else:
                 raise ImageRequestError("an image selection must be provided")
 
-            rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id)
+            rpc_out_msg = RwImageMgmtYang.YangOutput_RwImageMgmt_CreateUploadJob(job_id=job_id)
 
             xact_info.respond_xpath(
                     rwdts.XactRspCode.ACK,
@@ -217,14 +262,14 @@ class ImageDTSRPCHandler(object):
         def on_ready(_, status):
             reg_event.set()
 
-        self._subscriber = yield from self._dts.register(
-                xpath="I," + get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare,
-                    on_ready=on_ready,
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._create = yield from self._dts.register(
+            xpath="I," + get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare,
+                on_ready=on_ready,
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
         yield from reg_event.wait()
 
@@ -235,6 +280,9 @@ class ImageDTSRPCHandler(object):
 
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             if not msg.has_field("job_id"):
                 self._log.error("cancel-upload-job missing job-id field.")
                 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
@@ -256,14 +304,14 @@ class ImageDTSRPCHandler(object):
         def on_ready(_, status):
             reg_event.set()
 
-        self._subscriber = yield from self._dts.register(
-                xpath="I," + get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare,
-                    on_ready=on_ready,
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._cancel = yield from self._dts.register(
+            xpath="I," + get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare,
+                on_ready=on_ready,
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
         yield from reg_event.wait()
 
@@ -273,16 +321,31 @@ class ImageDTSRPCHandler(object):
         yield from self._register_create_upload_job()
         yield from self._register_cancel_upload_job()
 
+    def deregister(self):
+        self._log.debug("Project {}: Deregister image rpc handlers".
+                        format(self._project.name))
+        if self._create:
+            self._create.deregister()
+            self._create = None
+
+        if self._cancel:
+            self._cancel.deregister()
+            self._cancel = None
+
 
 class GlanceClientUploadTaskCreator(object):
     """ This class creates upload tasks using configured cloud accounts and
     configured image catalog glance client """
 
-    def __init__(self, log, loop, accounts, glance_client):
-        self._log = log
-        self._loop = loop
-        self._accounts = accounts
+    def __init__(self, project, glance_client):
+        self._log = project.log
+        self._loop = project.loop
         self._glance_client = glance_client
+        self._project = project
+
+    @property
+    def accounts(self):
+        return self._project.cloud_accounts
 
     @asyncio.coroutine
     def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
@@ -329,14 +392,14 @@ class GlanceClientUploadTaskCreator(object):
 
         tasks = []
         for account_name in account_names:
-            if account_name not in self._accounts:
+            if account_name not in self.accounts:
                 raise AccountNotFoundError("Could not find account %s", account_name)
 
         # For each account name provided, create a pipe (GlanceImagePipeGen)
         # which feeds data into the UploadTask while also monitoring the various
         # transmit stats (progress, bytes written, bytes per second, etc)
         for account_name in account_names:
-            account = self._accounts[account_name]
+            account = self.accounts[account_name]
             self._log.debug("creating task for account %s", account.name)
             glance_data_gen = self._glance_client.get_image_data(image_info.id)
 
@@ -397,6 +460,75 @@ class GlanceClientUploadTaskCreator(object):
             create_msg.image_checksum if "image_checksum" in create_msg else None)
             )
 
+class ImageMgrProject(ManoProject):
+
+    def __init__(self, name, tasklet, **kw):
+        super(ImageMgrProject, self).__init__(tasklet.log, name)
+        self.update(tasklet)
+        try:
+            self.glance_client = kw['client']
+        except KeyError as e:
+            self._log.exception("kw {}: {}".format(kw, e))
+
+        self.cloud_cfg_subscriber = None
+        self.job_controller = None
+        self.task_creator = None
+        self.rpc_handler = None
+        self.show_handler = None
+
+        self.cloud_accounts = {}
+
+    @asyncio.coroutine
+    def register(self):
+        try:
+            self.log.debug("creating cloud account handler")
+            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log,
+                                                               self._dts,
+                                                               self._log_hdl,
+                                                               self)
+            yield from self.cloud_cfg_subscriber.register(
+                    self.on_cloud_account_create,
+                    self.on_cloud_account_delete
+                    )
+
+            self.job_controller = upload.ImageUploadJobController(
+                    self
+                    )
+
+            self.task_creator = GlanceClientUploadTaskCreator(
+                    self, self.glance_client,
+                    )
+
+            self.rpc_handler = ImageDTSRPCHandler(
+                    self, self.glance_client, self.task_creator,
+                    self.job_controller,
+                    )
+            yield from self.rpc_handler.register()
+
+            self.show_handler = ImageDTSShowHandler(
+                    self, self.job_controller,
+                    )
+            yield from self.show_handler.register()
+        except Exception as e:
+            self.log.exception("Error during project {} register: e".
+                               format(self.name, e))
+
+    def deregister(self):
+        self.log.debug("De-register handlers for project: {}".format(self.name))
+        self.rpc_handler.deregister()
+        self.show_handler.deregister()
+        self.cloud_cfg_subscriber.deregister()
+
+    def on_cloud_account_create(self, account):
+        self.log.debug("adding cloud account: %s", account.name)
+        self.cloud_accounts[account.name] = account
+
+    def on_cloud_account_delete(self, account_name):
+        self.log.debug("deleting cloud account: %s", account_name)
+        if account_name not in self.cloud_accounts:
+            self.log.warning("cloud account not found: %s", account_name)
+        else:
+            del self.cloud_accounts[account_name]
 
 class ImageManagerTasklet(rift.tasklets.Tasklet):
     """
@@ -409,16 +541,13 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
         super().__init__(*args, **kwargs)
         self.rwlog.set_category("rw-mano-log")
 
-        self.cloud_cfg_subscriber = None
         self.http_proxy = None
         self.proxy_server = None
         self.dts = None
-        self.job_controller = None
-        self.cloud_accounts = {}
         self.glance_client = None
-        self.task_creator = None
-        self.rpc_handler = None
-        self.show_handler = None
+        self.project_handler = None
+
+        self.projects = {}
 
     def start(self):
         super().start()
@@ -443,13 +572,6 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
     @asyncio.coroutine
     def init(self):
         try:
-            self.log.debug("creating cloud account handler")
-            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl)
-            self.cloud_cfg_subscriber.register(
-                    self.on_cloud_account_create,
-                    self.on_cloud_account_delete
-                    )
-
             self.log.debug("creating http proxy server")
 
             self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
@@ -459,43 +581,18 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
                     )
             self.proxy_server.start()
 
-            self.job_controller = upload.ImageUploadJobController(
-                    self.log, self.loop
-                    )
-
             self.glance_client = glance_client.OpenstackGlanceClient.from_token(
                     self.log, "127.0.0.1", "9292", "test"
                     )
 
-            self.task_creator = GlanceClientUploadTaskCreator(
-                    self.log, self.loop, self.cloud_accounts, self.glance_client
-                    )
-
-            self.rpc_handler = ImageDTSRPCHandler(
-                    self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator,
-                    self.job_controller
-                    )
-            yield from self.rpc_handler.register()
-
-            self.show_handler = ImageDTSShowHandler(
-                    self.log, self.loop, self.dts, self.job_controller
-                    )
-            yield from self.show_handler.register()
+            self.log.debug("Creating project handler")
+            self.project_handler = ProjectHandler(self, ImageMgrProject,
+                                                  client=self.glance_client)
+            self.project_handler.register()
 
         except Exception as e:
             self.log.exception("error during init")
 
-    def on_cloud_account_create(self, account):
-        self.log.debug("adding cloud account: %s", account.name)
-        self.cloud_accounts[account.name] = account
-
-    def on_cloud_account_delete(self, account_name):
-        self.log.debug("deleting cloud account: %s", account_name)
-        if account_name not in self.cloud_accounts:
-            self.log.warning("cloud account not found: %s", account_name)
-
-        del self.cloud_accounts[account_name]
-
     @asyncio.coroutine
     def run(self):
         pass