X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=rwlaunchpad%2Fplugins%2Frwimagemgr%2Frift%2Ftasklets%2Frwimagemgr%2Ftasklet.py;h=f3ba2ed8db7d4ef57920a77762c6f3b34986fb7b;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=027e58212bafffe95ea0934888203624c8b4beb5;hpb=6f07e6f33f751ab4ffe624f6037f887b243bece2;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py index 027e5821..f3ba2ed8 100644 --- a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py +++ b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py @@ -22,6 +22,13 @@ import time import rift.tasklets import rift.mano.cloud +from rift.mano.utils.project import ( + ManoProject, + ProjectConfigCallbacks, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from . import glance_proxy_server from . import glance_client @@ -53,16 +60,18 @@ class ImageNotFoundError(ImageRequestError): class CloudAccountDtsHandler(object): - def __init__(self, log, dts, log_hdl): + def __init__(self, log, dts, log_hdl, project): self._dts = dts self._log = log self._log_hdl = log_hdl self._cloud_cfg_subscriber = None + self._project = project def register(self, on_add_apply, on_delete_apply): - self._log.debug("creating cloud account config handler") + self._log.debug("Project {}: creating cloud account config handler". + format(self._project.name)) self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber( - self._dts, self._log, self._log_hdl, + self._dts, self._log, self._log_hdl, self._project, rift.mano.cloud.CloudAccountConfigCallbacks( on_add_apply=on_add_apply, on_delete_apply=on_delete_apply, @@ -70,6 +79,11 @@ class CloudAccountDtsHandler(object): ) self._cloud_cfg_subscriber.register() + def deregister(self): + self._log.debug("Project {}: Removing cloud account config handler". + format(self._project.name)) + self._cloud_cfg_subscriber.deregister() + def openstack_image_to_image_info(openstack_image): """Convert the OpenstackImage to a ImageInfo protobuf message @@ -95,19 +109,21 @@ def openstack_image_to_image_info(openstack_image): class ImageDTSShowHandler(object): """ A DTS publisher for the upload-jobs data container """ - def __init__(self, log, loop, dts, job_controller): + def __init__(self, log, loop, dts, job_controller, project): self._log = log self._loop = loop self._dts = dts self._job_controller = job_controller + self._project = project self._subscriber = None + def get_xpath(self): + return self._project.add_project("D,/rw-image-mgmt:upload-jobs") + @asyncio.coroutine def register(self): """ Register as a publisher and wait for reg_ready to complete """ - def get_xpath(): - return "D,/rw-image-mgmt:upload-jobs" @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): @@ -119,7 +135,7 @@ class ImageDTSShowHandler(object): xact_info.respond_xpath( rwdts.XactRspCode.ACK, - xpath=get_xpath(), + xpath=self.get_xpath(), msg=jobs_pb_msg, ) @@ -130,7 +146,7 @@ class ImageDTSShowHandler(object): reg_event.set() self._subscriber = yield from self._dts.register( - xpath=get_xpath(), + xpath=self.get_xpath(), handler=rift.tasklets.DTS.RegistrationHandler( on_prepare=on_prepare, on_ready=on_ready, @@ -141,9 +157,18 @@ class ImageDTSShowHandler(object): yield from reg_event.wait() + def deregister(self): + self._log.debug("Project {}: De-register show image handler". + format(self._project.name)) + if self._subscriber: + self._subscriber.delete_element(self.get_xpath()) + self._subscriber.deregister() + self._subscriber = None + class ImageDTSRPCHandler(object): """ A DTS publisher for the upload-job RPC's """ - def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller): + def __init__(self, log, loop, dts, accounts, glance_client, + upload_task_creator, job_controller, project): self._log = log self._loop = loop self._dts = dts @@ -151,8 +176,10 @@ class ImageDTSRPCHandler(object): self._glance_client = glance_client self._upload_task_creator = upload_task_creator self._job_controller = job_controller + self._project = project - self._subscriber = None + self._create = None + self._cancel = None @asyncio.coroutine def _register_create_upload_job(self): @@ -164,6 +191,10 @@ class ImageDTSRPCHandler(object): create_msg = msg account_names = create_msg.cloud_account + + if not self._project.rpc_check(msg, xact_info): + return + # If cloud accounts were not specified, upload image to all cloud account if not account_names: account_names = list(self._accounts.keys()) @@ -217,14 +248,14 @@ class ImageDTSRPCHandler(object): def on_ready(_, status): reg_event.set() - self._subscriber = yield from self._dts.register( - xpath="I," + get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare, - on_ready=on_ready, - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._create = yield from self._dts.register( + xpath="I," + get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare, + on_ready=on_ready, + ), + flags=rwdts.Flag.PUBLISHER, + ) yield from reg_event.wait() @@ -235,6 +266,9 @@ class ImageDTSRPCHandler(object): @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): + if not self._project.rpc_check(msg, xact_info): + return + if not msg.has_field("job_id"): self._log.error("cancel-upload-job missing job-id field.") xact_info.respond_xpath(rwdts.XactRspCode.NACK) @@ -256,14 +290,14 @@ class ImageDTSRPCHandler(object): def on_ready(_, status): reg_event.set() - self._subscriber = yield from self._dts.register( - xpath="I," + get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare, - on_ready=on_ready, - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._cancel = yield from self._dts.register( + xpath="I," + get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare, + on_ready=on_ready, + ), + flags=rwdts.Flag.PUBLISHER, + ) yield from reg_event.wait() @@ -273,16 +307,28 @@ class ImageDTSRPCHandler(object): yield from self._register_create_upload_job() yield from self._register_cancel_upload_job() + def deregister(self): + self._log.debug("Project {}: Deregister image rpc handlers". + format(self._project.name)) + if self._create: + self._create.deregister() + self._create = None + + if self._cancel: + self._cancel.deregister() + self._cancel = None + class GlanceClientUploadTaskCreator(object): """ This class creates upload tasks using configured cloud accounts and configured image catalog glance client """ - def __init__(self, log, loop, accounts, glance_client): + def __init__(self, log, loop, accounts, glance_client, project): self._log = log self._loop = loop self._accounts = accounts self._glance_client = glance_client + self._project = project @asyncio.coroutine def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None): @@ -397,6 +443,78 @@ class GlanceClientUploadTaskCreator(object): create_msg.image_checksum if "image_checksum" in create_msg else None) ) +class ImageMgrProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(ImageMgrProject, self).__init__(tasklet.log, name) + self.update(tasklet) + try: + self.glance_client = kw['client'] + except KeyError as e: + self._log.exception("kw {}: {}".format(kw, e)) + + self.cloud_cfg_subscriber = None + self.job_controller = None + self.task_creator = None + self.rpc_handler = None + self.show_handler = None + + self.cloud_accounts = {} + + @asyncio.coroutine + def register(self): + try: + self.log.debug("creating cloud account handler") + self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log, + self._dts, + self._log_hdl, + self) + self.cloud_cfg_subscriber.register( + self.on_cloud_account_create, + self.on_cloud_account_delete + ) + + self.job_controller = upload.ImageUploadJobController( + self.log, self.loop, self + ) + + self.task_creator = GlanceClientUploadTaskCreator( + self.log, self.loop, self.cloud_accounts, + self.glance_client, self + ) + + self.rpc_handler = ImageDTSRPCHandler( + self.log, self.loop, self.dts, self.cloud_accounts, + self.glance_client, self.task_creator, + self.job_controller, self + ) + yield from self.rpc_handler.register() + + self.show_handler = ImageDTSShowHandler( + self.log, self.loop, self.dts, self.job_controller, self + ) + yield from self.show_handler.register() + except Exception as e: + self.log.exception("Error during project {} register: e". + format(self.name, e)) + + def deregister(self): + self.log.debug("De-register handlers for project: {}".format(self.name)) + self.rpc_handler.deregister() + self.show_handler.deregister() + self.cloud_cfg_subscriber.deregister() + + def on_cloud_account_create(self, account): + self.log.debug("adding cloud account: %s", account.name) + self.cloud_accounts[account.name] = account + + def on_cloud_account_delete(self, account_name): + self.log.debug("deleting cloud account: %s", account_name) + if account_name not in self.cloud_accounts: + self.log.warning("cloud account not found: %s", account_name) + else: + del self.cloud_accounts[account_name] + class ImageManagerTasklet(rift.tasklets.Tasklet): """ @@ -409,16 +527,13 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): super().__init__(*args, **kwargs) self.rwlog.set_category("rw-mano-log") - self.cloud_cfg_subscriber = None self.http_proxy = None self.proxy_server = None self.dts = None - self.job_controller = None - self.cloud_accounts = {} self.glance_client = None - self.task_creator = None - self.rpc_handler = None - self.show_handler = None + self.project_handler = None + + self.projects = {} def start(self): super().start() @@ -443,13 +558,6 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): try: - self.log.debug("creating cloud account handler") - self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl) - self.cloud_cfg_subscriber.register( - self.on_cloud_account_create, - self.on_cloud_account_delete - ) - self.log.debug("creating http proxy server") self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop) @@ -459,43 +567,18 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): ) self.proxy_server.start() - self.job_controller = upload.ImageUploadJobController( - self.log, self.loop - ) - self.glance_client = glance_client.OpenstackGlanceClient.from_token( self.log, "127.0.0.1", "9292", "test" ) - self.task_creator = GlanceClientUploadTaskCreator( - self.log, self.loop, self.cloud_accounts, self.glance_client - ) - - self.rpc_handler = ImageDTSRPCHandler( - self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator, - self.job_controller - ) - yield from self.rpc_handler.register() - - self.show_handler = ImageDTSShowHandler( - self.log, self.loop, self.dts, self.job_controller - ) - yield from self.show_handler.register() + self.log.debug("Creating project handler") + self.project_handler = ProjectHandler(self, ImageMgrProject, + client=self.glance_client) + self.project_handler.register() except Exception as e: self.log.exception("error during init") - def on_cloud_account_create(self, account): - self.log.debug("adding cloud account: %s", account.name) - self.cloud_accounts[account.name] = account - - def on_cloud_account_delete(self, account_name): - self.log.debug("deleting cloud account: %s", account_name) - if account_name not in self.cloud_accounts: - self.log.warning("cloud account not found: %s", account_name) - - del self.cloud_accounts[account_name] - @asyncio.coroutine def run(self): pass