Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwimagemgr / rift / tasklets / rwimagemgr / tasklet.py
index 027e582..f3ba2ed 100644 (file)
@@ -22,6 +22,13 @@ import time
 
 import rift.tasklets
 import rift.mano.cloud
+from rift.mano.utils.project import (
+    ManoProject,
+    ProjectConfigCallbacks,
+    ProjectHandler,
+    get_add_delete_update_cfgs,
+    DEFAULT_PROJECT,
+    )
 
 from . import glance_proxy_server
 from . import glance_client
@@ -53,16 +60,18 @@ class ImageNotFoundError(ImageRequestError):
 
 
 class CloudAccountDtsHandler(object):
-    def __init__(self, log, dts, log_hdl):
+    def __init__(self, log, dts, log_hdl, project):
         self._dts = dts
         self._log = log
         self._log_hdl = log_hdl
         self._cloud_cfg_subscriber = None
+        self._project = project
 
     def register(self, on_add_apply, on_delete_apply):
-        self._log.debug("creating cloud account config handler")
+        self._log.debug("Project {}: creating cloud account config handler".
+                        format(self._project.name))
         self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber(
-                self._dts, self._log, self._log_hdl,
+                self._dts, self._log, self._log_hdl, self._project,
                 rift.mano.cloud.CloudAccountConfigCallbacks(
                     on_add_apply=on_add_apply,
                     on_delete_apply=on_delete_apply,
@@ -70,6 +79,11 @@ class CloudAccountDtsHandler(object):
                 )
         self._cloud_cfg_subscriber.register()
 
+    def deregister(self):
+        self._log.debug("Project {}: Removing cloud account config handler".
+                        format(self._project.name))
+        self._cloud_cfg_subscriber.deregister()
+
 
 def openstack_image_to_image_info(openstack_image):
     """Convert the OpenstackImage to a ImageInfo protobuf message
@@ -95,19 +109,21 @@ def openstack_image_to_image_info(openstack_image):
 
 class ImageDTSShowHandler(object):
     """ A DTS publisher for the upload-jobs data container """
-    def __init__(self, log, loop, dts, job_controller):
+    def __init__(self, log, loop, dts, job_controller, project):
         self._log = log
         self._loop = loop
         self._dts = dts
         self._job_controller = job_controller
+        self._project = project
 
         self._subscriber = None
 
+    def get_xpath(self):
+        return self._project.add_project("D,/rw-image-mgmt:upload-jobs")
+
     @asyncio.coroutine
     def register(self):
         """ Register as a publisher and wait for reg_ready to complete """
-        def get_xpath():
-            return "D,/rw-image-mgmt:upload-jobs"
 
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
@@ -119,7 +135,7 @@ class ImageDTSShowHandler(object):
 
             xact_info.respond_xpath(
                     rwdts.XactRspCode.ACK,
-                    xpath=get_xpath(),
+                    xpath=self.get_xpath(),
                     msg=jobs_pb_msg,
                     )
 
@@ -130,7 +146,7 @@ class ImageDTSShowHandler(object):
             reg_event.set()
 
         self._subscriber = yield from self._dts.register(
-                xpath=get_xpath(),
+                xpath=self.get_xpath(),
                 handler=rift.tasklets.DTS.RegistrationHandler(
                     on_prepare=on_prepare,
                     on_ready=on_ready,
@@ -141,9 +157,18 @@ class ImageDTSShowHandler(object):
         yield from reg_event.wait()
 
 
+    def deregister(self):
+        self._log.debug("Project {}: De-register show image handler".
+                        format(self._project.name))
+        if self._subscriber:
+            self._subscriber.delete_element(self.get_xpath())
+            self._subscriber.deregister()
+            self._subscriber = None
+
 class ImageDTSRPCHandler(object):
     """ A DTS publisher for the upload-job RPC's """
-    def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller):
+    def __init__(self, log, loop, dts, accounts, glance_client,
+                 upload_task_creator, job_controller, project):
         self._log = log
         self._loop = loop
         self._dts = dts
@@ -151,8 +176,10 @@ class ImageDTSRPCHandler(object):
         self._glance_client = glance_client
         self._upload_task_creator = upload_task_creator
         self._job_controller = job_controller
+        self._project = project
 
-        self._subscriber = None
+        self._create = None
+        self._cancel = None
 
     @asyncio.coroutine
     def _register_create_upload_job(self):
@@ -164,6 +191,10 @@ class ImageDTSRPCHandler(object):
             create_msg = msg
 
             account_names = create_msg.cloud_account
+
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             # If cloud accounts were not specified, upload image to all cloud account
             if not account_names:
                 account_names = list(self._accounts.keys())
@@ -217,14 +248,14 @@ class ImageDTSRPCHandler(object):
         def on_ready(_, status):
             reg_event.set()
 
-        self._subscriber = yield from self._dts.register(
-                xpath="I," + get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare,
-                    on_ready=on_ready,
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._create = yield from self._dts.register(
+            xpath="I," + get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare,
+                on_ready=on_ready,
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
         yield from reg_event.wait()
 
@@ -235,6 +266,9 @@ class ImageDTSRPCHandler(object):
 
         @asyncio.coroutine
         def on_prepare(xact_info, action, ks_path, msg):
+            if not self._project.rpc_check(msg, xact_info):
+                return
+
             if not msg.has_field("job_id"):
                 self._log.error("cancel-upload-job missing job-id field.")
                 xact_info.respond_xpath(rwdts.XactRspCode.NACK)
@@ -256,14 +290,14 @@ class ImageDTSRPCHandler(object):
         def on_ready(_, status):
             reg_event.set()
 
-        self._subscriber = yield from self._dts.register(
-                xpath="I," + get_xpath(),
-                handler=rift.tasklets.DTS.RegistrationHandler(
-                    on_prepare=on_prepare,
-                    on_ready=on_ready,
-                    ),
-                flags=rwdts.Flag.PUBLISHER,
-                )
+        self._cancel = yield from self._dts.register(
+            xpath="I," + get_xpath(),
+            handler=rift.tasklets.DTS.RegistrationHandler(
+                on_prepare=on_prepare,
+                on_ready=on_ready,
+            ),
+            flags=rwdts.Flag.PUBLISHER,
+        )
 
         yield from reg_event.wait()
 
@@ -273,16 +307,28 @@ class ImageDTSRPCHandler(object):
         yield from self._register_create_upload_job()
         yield from self._register_cancel_upload_job()
 
+    def deregister(self):
+        self._log.debug("Project {}: Deregister image rpc handlers".
+                        format(self._project.name))
+        if self._create:
+            self._create.deregister()
+            self._create = None
+
+        if self._cancel:
+            self._cancel.deregister()
+            self._cancel = None
+
 
 class GlanceClientUploadTaskCreator(object):
     """ This class creates upload tasks using configured cloud accounts and
     configured image catalog glance client """
 
-    def __init__(self, log, loop, accounts, glance_client):
+    def __init__(self, log, loop, accounts, glance_client, project):
         self._log = log
         self._loop = loop
         self._accounts = accounts
         self._glance_client = glance_client
+        self._project = project
 
     @asyncio.coroutine
     def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None):
@@ -397,6 +443,78 @@ class GlanceClientUploadTaskCreator(object):
             create_msg.image_checksum if "image_checksum" in create_msg else None)
             )
 
+class ImageMgrProject(ManoProject):
+
+    def __init__(self, name, tasklet, **kw):
+        super(ImageMgrProject, self).__init__(tasklet.log, name)
+        self.update(tasklet)
+        try:
+            self.glance_client = kw['client']
+        except KeyError as e:
+            self._log.exception("kw {}: {}".format(kw, e))
+
+        self.cloud_cfg_subscriber = None
+        self.job_controller = None
+        self.task_creator = None
+        self.rpc_handler = None
+        self.show_handler = None
+
+        self.cloud_accounts = {}
+
+    @asyncio.coroutine
+    def register(self):
+        try:
+            self.log.debug("creating cloud account handler")
+            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log,
+                                                               self._dts,
+                                                               self._log_hdl,
+                                                               self)
+            self.cloud_cfg_subscriber.register(
+                    self.on_cloud_account_create,
+                    self.on_cloud_account_delete
+                    )
+
+            self.job_controller = upload.ImageUploadJobController(
+                    self.log, self.loop, self
+                    )
+
+            self.task_creator = GlanceClientUploadTaskCreator(
+                    self.log, self.loop, self.cloud_accounts,
+                    self.glance_client, self
+                    )
+
+            self.rpc_handler = ImageDTSRPCHandler(
+                    self.log, self.loop, self.dts, self.cloud_accounts,
+                    self.glance_client, self.task_creator,
+                    self.job_controller, self
+                    )
+            yield from self.rpc_handler.register()
+
+            self.show_handler = ImageDTSShowHandler(
+                    self.log, self.loop, self.dts, self.job_controller, self
+                    )
+            yield from self.show_handler.register()
+        except Exception as e:
+            self.log.exception("Error during project {} register: e".
+                               format(self.name, e))
+
+    def deregister(self):
+        self.log.debug("De-register handlers for project: {}".format(self.name))
+        self.rpc_handler.deregister()
+        self.show_handler.deregister()
+        self.cloud_cfg_subscriber.deregister()
+
+    def on_cloud_account_create(self, account):
+        self.log.debug("adding cloud account: %s", account.name)
+        self.cloud_accounts[account.name] = account
+
+    def on_cloud_account_delete(self, account_name):
+        self.log.debug("deleting cloud account: %s", account_name)
+        if account_name not in self.cloud_accounts:
+            self.log.warning("cloud account not found: %s", account_name)
+        else:
+            del self.cloud_accounts[account_name]
+
 
 class ImageManagerTasklet(rift.tasklets.Tasklet):
     """
@@ -409,16 +527,13 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
         super().__init__(*args, **kwargs)
         self.rwlog.set_category("rw-mano-log")
 
-        self.cloud_cfg_subscriber = None
         self.http_proxy = None
         self.proxy_server = None
         self.dts = None
-        self.job_controller = None
-        self.cloud_accounts = {}
         self.glance_client = None
-        self.task_creator = None
-        self.rpc_handler = None
-        self.show_handler = None
+        self.project_handler = None
+
+        self.projects = {}
 
     def start(self):
         super().start()
@@ -443,13 +558,6 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
     @asyncio.coroutine
     def init(self):
         try:
-            self.log.debug("creating cloud account handler")
-            self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl)
-            self.cloud_cfg_subscriber.register(
-                    self.on_cloud_account_create,
-                    self.on_cloud_account_delete
-                    )
-
             self.log.debug("creating http proxy server")
 
             self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop)
@@ -459,43 +567,18 @@ class ImageManagerTasklet(rift.tasklets.Tasklet):
                     )
             self.proxy_server.start()
 
-            self.job_controller = upload.ImageUploadJobController(
-                    self.log, self.loop
-                    )
-
             self.glance_client = glance_client.OpenstackGlanceClient.from_token(
                     self.log, "127.0.0.1", "9292", "test"
                     )
 
-            self.task_creator = GlanceClientUploadTaskCreator(
-                    self.log, self.loop, self.cloud_accounts, self.glance_client
-                    )
-
-            self.rpc_handler = ImageDTSRPCHandler(
-                    self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator,
-                    self.job_controller
-                    )
-            yield from self.rpc_handler.register()
-
-            self.show_handler = ImageDTSShowHandler(
-                    self.log, self.loop, self.dts, self.job_controller
-                    )
-            yield from self.show_handler.register()
+            self.log.debug("Creating project handler")
+            self.project_handler = ProjectHandler(self, ImageMgrProject,
+                                                  client=self.glance_client)
+            self.project_handler.register()
 
         except Exception as e:
             self.log.exception("error during init")
 
-    def on_cloud_account_create(self, account):
-        self.log.debug("adding cloud account: %s", account.name)
-        self.cloud_accounts[account.name] = account
-
-    def on_cloud_account_delete(self, account_name):
-        self.log.debug("deleting cloud account: %s", account_name)
-        if account_name not in self.cloud_accounts:
-            self.log.warning("cloud account not found: %s", account_name)
-
-        del self.cloud_accounts[account_name]
-
     @asyncio.coroutine
     def run(self):
         pass