X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwlaunchpadtasklet%2Frift%2Ftasklets%2Frwlaunchpad%2Ftasklet.py;h=d416f9cdce2e71c439e0a6712eb2405352b79582;hb=9ad945aab0b5a992e1df860bede8ecc9b143470e;hp=0eff6160e82270f5f090367c5c4bafe436c8d614;hpb=6364d016e7f819903ff29a2ce160cb4cea61bf8f;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py index 0eff6160..d416f9cd 100644 --- a/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py +++ b/rwlaunchpad/plugins/rwlaunchpadtasklet/rift/tasklets/rwlaunchpad/tasklet.py @@ -40,6 +40,12 @@ from gi.repository import ( import rift.tasklets import rift.mano.cloud import rift.mano.config_agent +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from rift.package import store from . import uploader @@ -53,73 +59,53 @@ MAX_BUFFER_SIZE = 1 * MB # Max. size loaded into memory! MAX_BODY_SIZE = 1 * MB # Max. size loaded into memory! -def get_add_delete_update_cfgs(dts_member_reg, xact, key_name): - # Unforunately, it is currently difficult to figure out what has exactly - # changed in this xact without Pbdelta support (RIFT-4916) - # As a workaround, we can fetch the pre and post xact elements and - # perform a comparison to figure out adds/deletes/updates - xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) - curr_cfgs = list(dts_member_reg.elements) - - xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} - curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} - - # Find Adds - added_keys = set(xact_key_map) - set(curr_key_map) - added_cfgs = [xact_key_map[key] for key in added_keys] - - # Find Deletes - deleted_keys = set(curr_key_map) - set(xact_key_map) - deleted_cfgs = [curr_key_map[key] for key in deleted_keys] - - # Find Updates - updated_keys = set(curr_key_map) & set(xact_key_map) - updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]] - - return added_cfgs, deleted_cfgs, updated_cfgs +class LaunchpadError(Exception): + pass +class LpProjectNotFound(Exception): + pass class CatalogDtsHandler(object): - def __init__(self, tasklet, app): + def __init__(self, project, app): self.app = app self.reg = None - self.tasklet = tasklet + self.project = project @property def log(self): - return self.tasklet.log + return self.project.log @property def dts(self): - return self.tasklet.dts + return self.project.dts class NsdCatalogDtsHandler(CatalogDtsHandler): - XPATH = "C,/nsd:nsd-catalog/nsd:nsd" + XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd" def add_nsd(self, nsd): self.log.debug('nsd-catalog-handler:add:{}'.format(nsd.id)) - if nsd.id not in self.tasklet.nsd_catalog: - self.tasklet.nsd_catalog[nsd.id] = nsd + if nsd.id not in self.project.nsd_catalog: + self.project.nsd_catalog[nsd.id] = nsd else: self.log.error("nsd already in catalog: {}".format(nsd.id)) def update_nsd(self, nsd): self.log.debug('nsd-catalog-handler:update:{}'.format(nsd.id)) - if nsd.id in self.tasklet.nsd_catalog: - self.tasklet.nsd_catalog[nsd.id] = nsd + if nsd.id in self.project.nsd_catalog: + self.project.nsd_catalog[nsd.id] = nsd else: self.log.error("unrecognized NSD: {}".format(nsd.id)) def delete_nsd(self, nsd_id): self.log.debug('nsd-catalog-handler:delete:{}'.format(nsd_id)) - if nsd_id in self.tasklet.nsd_catalog: - del self.tasklet.nsd_catalog[nsd_id] + if nsd_id in self.project.nsd_catalog: + del self.project.nsd_catalog[nsd_id] else: self.log.error("unrecognized NSD: {}".format(nsd_id)) try: - self.tasklet.nsd_package_store.delete_package(nsd_id) + self.project.tasklet.nsd_package_store.delete_package(nsd_id) except store.PackageStoreError as e: self.log.warning("could not delete package from store: %s", str(e)) @@ -151,47 +137,54 @@ class NsdCatalogDtsHandler(CatalogDtsHandler): for cfg in update_cfgs: self.update_nsd(cfg) - self.log.debug("Registering for NSD catalog") + self.log.debug("Registering for NSD catalog in project". + format(self.project.name)) acg_handler = rift.tasklets.AppConfGroup.Handler( on_apply=apply_config, ) with self.dts.appconf_group_create(acg_handler) as acg: + xpath = self.project.add_project(NsdCatalogDtsHandler.XPATH) self.reg = acg.register( - xpath=NsdCatalogDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER, ) + def deregister(self): + if self.reg: + self.reg.deregister() + self.reg = None + class VnfdCatalogDtsHandler(CatalogDtsHandler): - XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd" + XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" def add_vnfd(self, vnfd): self.log.debug('vnfd-catalog-handler:add:{}'.format(vnfd.id)) - if vnfd.id not in self.tasklet.vnfd_catalog: - self.tasklet.vnfd_catalog[vnfd.id] = vnfd + if vnfd.id not in self.project.vnfd_catalog: + self.project.vnfd_catalog[vnfd.id] = vnfd else: self.log.error("VNFD already in catalog: {}".format(vnfd.id)) def update_vnfd(self, vnfd): self.log.debug('vnfd-catalog-handler:update:{}'.format(vnfd.id)) - if vnfd.id in self.tasklet.vnfd_catalog: - self.tasklet.vnfd_catalog[vnfd.id] = vnfd + if vnfd.id in self.project.vnfd_catalog: + self.project.vnfd_catalog[vnfd.id] = vnfd else: self.log.error("unrecognized VNFD: {}".format(vnfd.id)) def delete_vnfd(self, vnfd_id): self.log.debug('vnfd-catalog-handler:delete:{}'.format(vnfd_id)) - if vnfd_id in self.tasklet.vnfd_catalog: - del self.tasklet.vnfd_catalog[vnfd_id] + if vnfd_id in self.project.vnfd_catalog: + del self.project.vnfd_catalog[vnfd_id] else: self.log.error("unrecognized VNFD: {}".format(vnfd_id)) try: - self.tasklet.vnfd_package_store.delete_package(vnfd_id) + self.project.tasklet.vnfd_package_store.delete_package(vnfd_id) except store.PackageStoreError as e: self.log.warning("could not delete package from store: %s", str(e)) @@ -223,28 +216,36 @@ class VnfdCatalogDtsHandler(CatalogDtsHandler): for cfg in update_cfgs: self.update_vnfd(cfg) - self.log.debug("Registering for VNFD catalog") + self.log.debug("Registering for VNFD catalog in project {}". + format(self.project.name)) acg_handler = rift.tasklets.AppConfGroup.Handler( on_apply=apply_config, ) with self.dts.appconf_group_create(acg_handler) as acg: + xpath = self.project.add_project(VnfdCatalogDtsHandler.XPATH) self.reg = acg.register( - xpath=VnfdCatalogDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER, ) + def deregister(self): + if self.reg: + self.reg.deregister() + self.reg = None + class CfgAgentAccountHandlers(object): - def __init__(self, dts, log, log_hdl, loop): + def __init__(self, dts, log, log_hdl, loop, project): self._dts = dts self._log = log self._log_hdl = log_hdl self._loop = loop + self._project = project self._log.debug("creating config agent account config handler") self.cfg_agent_cfg_handler = rift.mano.config_agent.ConfigAgentSubscriber( - self._dts, self._log, + self._dts, self._log, self._project, rift.mano.config_agent.ConfigAgentCallbacks( on_add_apply=self.on_cfg_agent_account_added, on_delete_apply=self.on_cfg_agent_account_deleted, @@ -253,7 +254,7 @@ class CfgAgentAccountHandlers(object): self._log.debug("creating config agent account opdata handler") self.cfg_agent_operdata_handler = rift.mano.config_agent.CfgAgentDtsOperdataHandler( - self._dts, self._log, self._loop, + self._dts, self._log, self._loop, self._project ) def on_cfg_agent_account_deleted(self, account): @@ -269,61 +270,65 @@ class CfgAgentAccountHandlers(object): self.cfg_agent_cfg_handler.register() yield from self.cfg_agent_operdata_handler.register() + def deregister(self): + self.cfg_agent_operdata_handler.deregister() + self.cfg_agent_cfg_handler.deregister() + + class CloudAccountHandlers(object): - def __init__(self, dts, log, log_hdl, loop, app): + def __init__(self, dts, log, log_hdl, loop, app, project): self._log = log self._log_hdl = log_hdl self._dts = dts self._loop = loop self._app = app + self._project = project - self._log.debug("creating cloud account config handler") + self._log.debug("Creating cloud account config handler for project {}". + format(project.name)) self.cloud_cfg_handler = rift.mano.cloud.CloudAccountConfigSubscriber( - self._dts, self._log, self._log_hdl, + self._dts, self._log, self._log_hdl, self._project, rift.mano.cloud.CloudAccountConfigCallbacks( on_add_apply=self.on_cloud_account_added, on_delete_apply=self.on_cloud_account_deleted, - ) + ), ) self._log.debug("creating cloud account opdata handler") self.cloud_operdata_handler = rift.mano.cloud.CloudAccountDtsOperdataHandler( - self._dts, self._log, self._loop, + self._dts, self._log, self._loop, self._project, ) def on_cloud_account_deleted(self, account_name): self._log.debug("cloud account deleted") - self._app.accounts.clear() - self._app.accounts.extend(list(self.cloud_cfg_handler.accounts.values())) + self._app.accounts[self._project.name] = \ + list(self.cloud_cfg_handler.accounts.values()) self.cloud_operdata_handler.delete_cloud_account(account_name) def on_cloud_account_added(self, account): self._log.debug("cloud account added") - self._app.accounts.clear() - self._app.accounts.extend(list(self.cloud_cfg_handler.accounts.values())) + self._app.accounts[self._project.name] = \ + list(self.cloud_cfg_handler.accounts.values()) self._log.debug("accounts: %s", self._app.accounts) self.cloud_operdata_handler.add_cloud_account(account) @asyncio.coroutine def register(self): - self.cloud_cfg_handler.register() + yield from self.cloud_cfg_handler.register() yield from self.cloud_operdata_handler.register() + def deregister(self): + self.cloud_cfg_handler.deregister() + self.cloud_operdata_handler.deregister() -class LaunchpadTasklet(rift.tasklets.Tasklet): - UPLOAD_MAX_BODY_SIZE = MAX_BODY_SIZE - UPLOAD_MAX_BUFFER_SIZE = MAX_BUFFER_SIZE - UPLOAD_PORT = "4567" - def __init__(self, *args, **kwargs): - super(LaunchpadTasklet, self).__init__(*args, **kwargs) - self.rwlog.set_category("rw-mano-log") - self.rwlog.set_subcategory("launchpad") +class LaunchpadProject(ManoProject): - self.app = None - self.server = None + def __init__(self, name, tasklet, **kw): + super(LaunchpadProject, self).__init__(tasklet.log, name) + self.update(tasklet) + self._app = kw['app'] - self.account_handler = None self.config_handler = None self.nsd_catalog_handler = None self.vld_catalog_handler = None @@ -331,14 +336,60 @@ class LaunchpadTasklet(rift.tasklets.Tasklet): self.cloud_handler = None self.datacenter_handler = None self.lp_config_handler = None - - self.vnfd_package_store = store.VnfdPackageFilesystemStore(self.log) - self.nsd_package_store = store.NsdPackageFilesystemStore(self.log) + self.account_handler = None self.nsd_catalog = dict() self.vld_catalog = dict() self.vnfd_catalog = dict() + @property + def dts(self): + return self._dts + + @property + def loop(self): + return self._loop + + @asyncio.coroutine + def register(self): + self.log.debug("creating NSD catalog handler for project {}".format(self.name)) + self.nsd_catalog_handler = NsdCatalogDtsHandler(self, self._app) + yield from self.nsd_catalog_handler.register() + + self.log.debug("creating VNFD catalog handler for project {}".format(self.name)) + self.vnfd_catalog_handler = VnfdCatalogDtsHandler(self, self._app) + yield from self.vnfd_catalog_handler.register() + + self.log.debug("creating datacenter handler for project {}".format(self.name)) + self.datacenter_handler = datacenters.DataCenterPublisher(self.log, self.dts, + self.loop, self) + yield from self.datacenter_handler.register() + + self.log.debug("creating cloud account handler for project {}".format(self.name)) + self.cloud_handler = CloudAccountHandlers(self.dts, self.log, self.log_hdl, + self.loop, self._app, self) + yield from self.cloud_handler.register() + + self.log.debug("creating config agent handler for project {}".format(self.name)) + self.config_handler = CfgAgentAccountHandlers(self.dts, self.log, self.log_hdl, + self.loop, self) + yield from self.config_handler.register() + + def deregister(self): + self.log.debug("De-register handlers for project: {}".format(self.name)) + self.config_handler.deregister() + self.cloud_handler.deregister() + self.datacenter_handler.deregister() + self.vnfd_catalog_handler.deregister() + self.nsd_catalog_handler.deregister() + + @asyncio.coroutine + def delete_prepare(self): + # TODO: Do we need this check + # if self.nsd_catalog or self.vnfd_catalog or self.vld_catalog: + # return False + return True + @property def cloud_accounts(self): if self.cloud_handler is None: @@ -346,6 +397,48 @@ class LaunchpadTasklet(rift.tasklets.Tasklet): return list(self.cloud_handler.cloud_cfg_handler.accounts.values()) + +class LaunchpadTasklet(rift.tasklets.Tasklet): + UPLOAD_MAX_BODY_SIZE = MAX_BODY_SIZE + UPLOAD_MAX_BUFFER_SIZE = MAX_BUFFER_SIZE + UPLOAD_PORT = "4567" + + def __init__(self, *args, **kwargs): + super(LaunchpadTasklet, self).__init__(*args, **kwargs) + self.rwlog.set_category("rw-mano-log") + self.rwlog.set_subcategory("launchpad") + + self.dts = None + self.project_handler = None + + self.vnfd_package_store = store.VnfdPackageFilesystemStore(self.log) + self.nsd_package_store = store.NsdPackageFilesystemStore(self.log) + + self.app = None + self.server = None + self.projects = {} + print("LP Tasklet init") + + def _get_project(project=None): + if project is None: + project = DEFAULT_PROJECT + + if project in self.projects: + return self.projects[project] + + msg = "Project {} not found".format(project) + self._log.error(msg) + raise LpProjectNotFound(msg) + + def nsd_catalog_get(self, project=None): + return self._get_project(project=project).nsd_catalog + + def vnfd_catalog_get(self, project=None): + return self._get_project(project=project).vnfd_catalog + + def get_cloud_accounts(self, project=None): + return self._get_project(project=project).cloud_accounts + def start(self): super(LaunchpadTasklet, self).start() self.log.info("Starting LaunchpadTasklet") @@ -368,56 +461,51 @@ class LaunchpadTasklet(rift.tasklets.Tasklet): self.log.exception("Caught Exception in LP stop") raise + def get_vnfd_catalog(self, project): + return self.projects[project].vnfd_catalog + + def get_nsd_catalog(self, project): + return self.projects[project].nsd_catalog + @asyncio.coroutine def init(self): - io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) - self.app = uploader.UploaderApplication.from_tasklet(self) - yield from self.app.register() - - manifest = self.tasklet_info.get_pb_manifest() - ssl_cert = manifest.bootstrap_phase.rwsecurity.cert - ssl_key = manifest.bootstrap_phase.rwsecurity.key - ssl_options = { + try: + io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) + self.app = uploader.UploaderApplication.from_tasklet(self) + yield from self.app.register() + + manifest = self.tasklet_info.get_pb_manifest() + ssl_cert = manifest.bootstrap_phase.rwsecurity.cert + ssl_key = manifest.bootstrap_phase.rwsecurity.key + ssl_options = { "certfile": ssl_cert, "keyfile": ssl_key, - } - - if manifest.bootstrap_phase.rwsecurity.use_ssl: - self.server = tornado.httpserver.HTTPServer( - self.app, - max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE, - io_loop=io_loop, - ssl_options=ssl_options, - ) - - else: - self.server = tornado.httpserver.HTTPServer( - self.app, - max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE, - io_loop=io_loop, - ) - - self.log.debug("creating NSD catalog handler") - self.nsd_catalog_handler = NsdCatalogDtsHandler(self, self.app) - yield from self.nsd_catalog_handler.register() - - self.log.debug("creating VNFD catalog handler") - self.vnfd_catalog_handler = VnfdCatalogDtsHandler(self, self.app) - yield from self.vnfd_catalog_handler.register() - - self.log.debug("creating datacenter handler") - self.datacenter_handler = datacenters.DataCenterPublisher(self.log, self.dts, self.loop) - yield from self.datacenter_handler.register() + } + + if manifest.bootstrap_phase.rwsecurity.use_ssl: + self.server = tornado.httpserver.HTTPServer( + self.app, + max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE, + io_loop=io_loop, + ssl_options=ssl_options, + ) - self.log.debug("creating cloud account handler") - self.cloud_handler = CloudAccountHandlers( - self.dts, self.log, self.log_hdl, self.loop, self.app + else: + self.server = tornado.httpserver.HTTPServer( + self.app, + max_body_size=LaunchpadTasklet.UPLOAD_MAX_BODY_SIZE, + io_loop=io_loop, ) - yield from self.cloud_handler.register() - self.log.debug("creating config agent handler") - self.config_handler = CfgAgentAccountHandlers(self.dts, self.log, self.log_hdl, self.loop) - yield from self.config_handler.register() + self.log.debug("Registering project handler") + print("PJ: Registering project handler") + self.project_handler = ProjectHandler(self, LaunchpadProject, + app=self.app) + self.project_handler.register() + + except Exception as e: + self.log.error("Exception : {}".format(e)) + self.log.exception(e) @asyncio.coroutine def run(self):