X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwimagemgr%2Frift%2Ftasklets%2Frwimagemgr%2Ftasklet.py;fp=rwlaunchpad%2Fplugins%2Frwimagemgr%2Frift%2Ftasklets%2Frwimagemgr%2Ftasklet.py;h=9ea9cbc7c7bff18cdf943da4b3cdb660c8690d4a;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=027e58212bafffe95ea0934888203624c8b4beb5;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py index 027e5821..9ea9cbc7 100644 --- a/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py +++ b/rwlaunchpad/plugins/rwimagemgr/rift/tasklets/rwimagemgr/tasklet.py @@ -22,6 +22,13 @@ import time import rift.tasklets import rift.mano.cloud +from rift.mano.utils.project import ( + ManoProject, + ProjectConfigCallbacks, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from . import glance_proxy_server from . import glance_client @@ -53,22 +60,30 @@ class ImageNotFoundError(ImageRequestError): class CloudAccountDtsHandler(object): - def __init__(self, log, dts, log_hdl): + def __init__(self, log, dts, log_hdl, project): self._dts = dts self._log = log self._log_hdl = log_hdl self._cloud_cfg_subscriber = None + self._project = project + @asyncio.coroutine def register(self, on_add_apply, on_delete_apply): - self._log.debug("creating cloud account config handler") + self._log.debug("Project {}: creating cloud account config handler". + format(self._project.name)) self._cloud_cfg_subscriber = rift.mano.cloud.CloudAccountConfigSubscriber( - self._dts, self._log, self._log_hdl, + self._dts, self._log, self._log_hdl, self._project, rift.mano.cloud.CloudAccountConfigCallbacks( on_add_apply=on_add_apply, on_delete_apply=on_delete_apply, ) ) - self._cloud_cfg_subscriber.register() + yield from self._cloud_cfg_subscriber.register() + + def deregister(self): + self._log.debug("Project {}: Removing cloud account config handler". + format(self._project.name)) + self._cloud_cfg_subscriber.deregister() def openstack_image_to_image_info(openstack_image): @@ -81,13 +96,19 @@ def openstack_image_to_image_info(openstack_image): A ImageInfo CAL protobuf message """ - image_info = RwcalYang.ImageInfoItem() + image_info = RwcalYang.YangData_RwProject_Project_VimResources_ImageinfoList() copy_fields = ["id", "name", "checksum", "container_format", "disk_format"] for field in copy_fields: value = getattr(openstack_image, field) setattr(image_info, field, value) + value = getattr(openstack_image, "properties") + for key in value: + prop = image_info.properties.add() + prop.name = key + prop.property_value = value[key] + image_info.state = openstack_image.status return image_info @@ -95,19 +116,21 @@ def openstack_image_to_image_info(openstack_image): class ImageDTSShowHandler(object): """ A DTS publisher for the upload-jobs data container """ - def __init__(self, log, loop, dts, job_controller): - self._log = log - self._loop = loop - self._dts = dts + def __init__(self, project, job_controller): + self._log = project.log + self._loop = project.loop + self._dts = project.dts self._job_controller = job_controller + self._project = project self._subscriber = None + def get_xpath(self): + return self._project.add_project("D,/rw-image-mgmt:upload-jobs") + @asyncio.coroutine def register(self): """ Register as a publisher and wait for reg_ready to complete """ - def get_xpath(): - return "D,/rw-image-mgmt:upload-jobs" @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): @@ -119,7 +142,7 @@ class ImageDTSShowHandler(object): xact_info.respond_xpath( rwdts.XactRspCode.ACK, - xpath=get_xpath(), + xpath=self.get_xpath(), msg=jobs_pb_msg, ) @@ -130,7 +153,7 @@ class ImageDTSShowHandler(object): reg_event.set() self._subscriber = yield from self._dts.register( - xpath=get_xpath(), + xpath=self.get_xpath(), handler=rift.tasklets.DTS.RegistrationHandler( on_prepare=on_prepare, on_ready=on_ready, @@ -141,18 +164,31 @@ class ImageDTSShowHandler(object): yield from reg_event.wait() + def deregister(self): + self._log.debug("Project {}: De-register show image handler". + format(self._project.name)) + if self._subscriber: + self._subscriber.delete_element(self.get_xpath()) + self._subscriber.deregister() + self._subscriber = None + class ImageDTSRPCHandler(object): """ A DTS publisher for the upload-job RPC's """ - def __init__(self, log, loop, dts, accounts, glance_client, upload_task_creator, job_controller): - self._log = log - self._loop = loop - self._dts = dts - self._accounts = accounts + def __init__(self, project, glance_client, upload_task_creator, job_controller): + self._log = project.log + self._loop = project.loop + self._dts = project.dts self._glance_client = glance_client self._upload_task_creator = upload_task_creator self._job_controller = job_controller + self._project = project - self._subscriber = None + self._create = None + self._cancel = None + + @property + def accounts(self): + return self._project.cloud_accounts @asyncio.coroutine def _register_create_upload_job(self): @@ -164,13 +200,20 @@ class ImageDTSRPCHandler(object): create_msg = msg account_names = create_msg.cloud_account + + self._log.debug("Create upload job msg: {} ".format(msg.as_dict())) + + if not self._project.rpc_check(msg, xact_info): + return + # If cloud accounts were not specified, upload image to all cloud account if not account_names: - account_names = list(self._accounts.keys()) + account_names = list(self.accounts.keys()) - for account_name in account_names: - if account_name not in self._accounts: - raise AccountNotFoundError("Could not find account %s", account_name) + else: + for account_name in account_names: + if account_name not in self.accounts: + raise AccountNotFoundError("Could not find account %s", account_name) if create_msg.has_field("external_url"): glance_image = yield from self._upload_task_creator.create_glance_image_from_url_create_rpc( @@ -195,6 +238,8 @@ class ImageDTSRPCHandler(object): ) elif create_msg.has_field("onboarded_image"): + self._log.debug("onboarded_image {} to accounts {}". + format(create_msg.onboarded_image, account_names)) tasks = yield from self._upload_task_creator.create_tasks_from_onboarded_create_rpc( account_names, create_msg.onboarded_image ) @@ -203,7 +248,7 @@ class ImageDTSRPCHandler(object): else: raise ImageRequestError("an image selection must be provided") - rpc_out_msg = RwImageMgmtYang.CreateUploadJobOutput(job_id=job_id) + rpc_out_msg = RwImageMgmtYang.YangOutput_RwImageMgmt_CreateUploadJob(job_id=job_id) xact_info.respond_xpath( rwdts.XactRspCode.ACK, @@ -217,14 +262,14 @@ class ImageDTSRPCHandler(object): def on_ready(_, status): reg_event.set() - self._subscriber = yield from self._dts.register( - xpath="I," + get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare, - on_ready=on_ready, - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._create = yield from self._dts.register( + xpath="I," + get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare, + on_ready=on_ready, + ), + flags=rwdts.Flag.PUBLISHER, + ) yield from reg_event.wait() @@ -235,6 +280,9 @@ class ImageDTSRPCHandler(object): @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): + if not self._project.rpc_check(msg, xact_info): + return + if not msg.has_field("job_id"): self._log.error("cancel-upload-job missing job-id field.") xact_info.respond_xpath(rwdts.XactRspCode.NACK) @@ -256,14 +304,14 @@ class ImageDTSRPCHandler(object): def on_ready(_, status): reg_event.set() - self._subscriber = yield from self._dts.register( - xpath="I," + get_xpath(), - handler=rift.tasklets.DTS.RegistrationHandler( - on_prepare=on_prepare, - on_ready=on_ready, - ), - flags=rwdts.Flag.PUBLISHER, - ) + self._cancel = yield from self._dts.register( + xpath="I," + get_xpath(), + handler=rift.tasklets.DTS.RegistrationHandler( + on_prepare=on_prepare, + on_ready=on_ready, + ), + flags=rwdts.Flag.PUBLISHER, + ) yield from reg_event.wait() @@ -273,16 +321,31 @@ class ImageDTSRPCHandler(object): yield from self._register_create_upload_job() yield from self._register_cancel_upload_job() + def deregister(self): + self._log.debug("Project {}: Deregister image rpc handlers". + format(self._project.name)) + if self._create: + self._create.deregister() + self._create = None + + if self._cancel: + self._cancel.deregister() + self._cancel = None + class GlanceClientUploadTaskCreator(object): """ This class creates upload tasks using configured cloud accounts and configured image catalog glance client """ - def __init__(self, log, loop, accounts, glance_client): - self._log = log - self._loop = loop - self._accounts = accounts + def __init__(self, project, glance_client): + self._log = project.log + self._loop = project.loop self._glance_client = glance_client + self._project = project + + @property + def accounts(self): + return self._project.cloud_accounts @asyncio.coroutine def create_tasks(self, account_names, image_id=None, image_name=None, image_checksum=None): @@ -329,14 +392,14 @@ class GlanceClientUploadTaskCreator(object): tasks = [] for account_name in account_names: - if account_name not in self._accounts: + if account_name not in self.accounts: raise AccountNotFoundError("Could not find account %s", account_name) # For each account name provided, create a pipe (GlanceImagePipeGen) # which feeds data into the UploadTask while also monitoring the various # transmit stats (progress, bytes written, bytes per second, etc) for account_name in account_names: - account = self._accounts[account_name] + account = self.accounts[account_name] self._log.debug("creating task for account %s", account.name) glance_data_gen = self._glance_client.get_image_data(image_info.id) @@ -397,6 +460,75 @@ class GlanceClientUploadTaskCreator(object): create_msg.image_checksum if "image_checksum" in create_msg else None) ) +class ImageMgrProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(ImageMgrProject, self).__init__(tasklet.log, name) + self.update(tasklet) + try: + self.glance_client = kw['client'] + except KeyError as e: + self._log.exception("kw {}: {}".format(kw, e)) + + self.cloud_cfg_subscriber = None + self.job_controller = None + self.task_creator = None + self.rpc_handler = None + self.show_handler = None + + self.cloud_accounts = {} + + @asyncio.coroutine + def register(self): + try: + self.log.debug("creating cloud account handler") + self.cloud_cfg_subscriber = CloudAccountDtsHandler(self._log, + self._dts, + self._log_hdl, + self) + yield from self.cloud_cfg_subscriber.register( + self.on_cloud_account_create, + self.on_cloud_account_delete + ) + + self.job_controller = upload.ImageUploadJobController( + self + ) + + self.task_creator = GlanceClientUploadTaskCreator( + self, self.glance_client, + ) + + self.rpc_handler = ImageDTSRPCHandler( + self, self.glance_client, self.task_creator, + self.job_controller, + ) + yield from self.rpc_handler.register() + + self.show_handler = ImageDTSShowHandler( + self, self.job_controller, + ) + yield from self.show_handler.register() + except Exception as e: + self.log.exception("Error during project {} register: e". + format(self.name, e)) + + def deregister(self): + self.log.debug("De-register handlers for project: {}".format(self.name)) + self.rpc_handler.deregister() + self.show_handler.deregister() + self.cloud_cfg_subscriber.deregister() + + def on_cloud_account_create(self, account): + self.log.debug("adding cloud account: %s", account.name) + self.cloud_accounts[account.name] = account + + def on_cloud_account_delete(self, account_name): + self.log.debug("deleting cloud account: %s", account_name) + if account_name not in self.cloud_accounts: + self.log.warning("cloud account not found: %s", account_name) + else: + del self.cloud_accounts[account_name] class ImageManagerTasklet(rift.tasklets.Tasklet): """ @@ -409,16 +541,13 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): super().__init__(*args, **kwargs) self.rwlog.set_category("rw-mano-log") - self.cloud_cfg_subscriber = None self.http_proxy = None self.proxy_server = None self.dts = None - self.job_controller = None - self.cloud_accounts = {} self.glance_client = None - self.task_creator = None - self.rpc_handler = None - self.show_handler = None + self.project_handler = None + + self.projects = {} def start(self): super().start() @@ -443,13 +572,6 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): try: - self.log.debug("creating cloud account handler") - self.cloud_cfg_subscriber = CloudAccountDtsHandler(self.log, self.dts, self.log_hdl) - self.cloud_cfg_subscriber.register( - self.on_cloud_account_create, - self.on_cloud_account_delete - ) - self.log.debug("creating http proxy server") self.http_proxy = glance_proxy_server.QuickProxyServer(self.log, self.loop) @@ -459,43 +581,18 @@ class ImageManagerTasklet(rift.tasklets.Tasklet): ) self.proxy_server.start() - self.job_controller = upload.ImageUploadJobController( - self.log, self.loop - ) - self.glance_client = glance_client.OpenstackGlanceClient.from_token( self.log, "127.0.0.1", "9292", "test" ) - self.task_creator = GlanceClientUploadTaskCreator( - self.log, self.loop, self.cloud_accounts, self.glance_client - ) - - self.rpc_handler = ImageDTSRPCHandler( - self.log, self.loop, self.dts, self.cloud_accounts, self.glance_client, self.task_creator, - self.job_controller - ) - yield from self.rpc_handler.register() - - self.show_handler = ImageDTSShowHandler( - self.log, self.loop, self.dts, self.job_controller - ) - yield from self.show_handler.register() + self.log.debug("Creating project handler") + self.project_handler = ProjectHandler(self, ImageMgrProject, + client=self.glance_client) + self.project_handler.register() except Exception as e: self.log.exception("error during init") - def on_cloud_account_create(self, account): - self.log.debug("adding cloud account: %s", account.name) - self.cloud_accounts[account.name] = account - - def on_cloud_account_delete(self, account_name): - self.log.debug("deleting cloud account: %s", account_name) - if account_name not in self.cloud_accounts: - self.log.warning("cloud account not found: %s", account_name) - - del self.cloud_accounts[account_name] - @asyncio.coroutine def run(self): pass