X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=ea17e6015ecc2113699bacd5dcd943b3a64fbe89;hb=a3bb91f092d378448cb870eccd45d43865de143c;hp=69fac685cdbbc09ad0d4ee2a26c9479b0f14fca8;hpb=5a660df2c93308dc82a1bd31b8eb000558910ee9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 69fac685..ea17e601 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -35,7 +35,7 @@ from enum import Enum import gi gi.require_version('RwYang', '1.0') -gi.require_version('RwNsdYang', '1.0') +gi.require_version('ProjectNsdYang', '1.0') gi.require_version('RwDts', '1.0') gi.require_version('RwNsmYang', '1.0') gi.require_version('RwNsrYang', '1.0') @@ -46,7 +46,7 @@ from gi.repository import ( RwYang, RwNsrYang, NsrYang, - NsdYang, + ProjectNsdYang as NsdYang, RwVlrYang, VnfrYang, RwVnfrYang, @@ -61,6 +61,12 @@ import rift.tasklets import rift.mano.ncclient import rift.mano.config_data.config import rift.mano.dts as mano_dts +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from . import rwnsm_conman as conman from . import cloud @@ -225,7 +231,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'init', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) elif self._vnffgr_state == VnffgRecordState.TERMINATED: vnffgr_dict = {"id": self._vnffgr_id, "vnffgd_id_ref": self._vnffgd_msg.id, @@ -233,7 +239,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'terminated', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) else: try: vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id) @@ -246,7 +252,7 @@ class VnffgRecord(object): "sdn_account": self._sdn_account_name, "operational_status": 'failed', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) return vnffgr @@ -258,7 +264,7 @@ class VnffgRecord(object): "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) for rsp in self._vnffgd_msg.rsp: vnffgr_rsp = vnffgr.rsp.add() vnffgr_rsp.id = str(uuid.uuid4()) @@ -270,9 +276,11 @@ class VnffgRecord(object): vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref] self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd) if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'): - self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) + self._log.debug("Service Function Type for VNFD ID %s is %s", + rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) else: - self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref) + self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain", + rsp_cp_ref.vnfd_id_ref) continue vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add() @@ -293,7 +301,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -320,7 +329,8 @@ class VnffgRecord(object): rsp_id_ref = _rsp[0].id rsp_name = _rsp[0].name else: - self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) + self._log.error("RSP with ID %s not found during classifier creation for classifier id %s", + vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) continue vnffgr_classifier = vnffgr.classifier.add() vnffgr_classifier.id = vnffgd_classifier.id @@ -344,7 +354,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -357,8 +368,9 @@ class VnffgRecord(object): for ext_intf in vdu.external_interface: if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref: vnffgr_classifier.vm_id = vdu.vim_id - self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, - vnfr_cp_ref.connection_point_params.vm_id) + self._log.debug("VIM ID for CP %s in VNFR %s is %s", + cp.name,nsr_vnfr.id, + vnfr_cp_ref.connection_point_params.vm_id) break self._log.info("VNFFGR msg to be sent is %s", vnffgr) @@ -459,7 +471,9 @@ class VirtualLinkRecord(object): XPATH = "D,/vlr:vlr-catalog/vlr:vlr" @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False): + def create_record(dts, log, loop, project, nsr_name, vld_msg, + cloud_account_name, om_datacenter, ip_profile, + nsr_id, restart_mode=False): """Creates a new VLR object based on the given data. If restart mode is enabled, then we look for existing records in the @@ -472,6 +486,7 @@ class VirtualLinkRecord(object): dts, log, loop, + project, nsr_name, vld_msg, cloud_account_name, @@ -482,7 +497,7 @@ class VirtualLinkRecord(object): if restart_mode: res_iter = yield from dts.query_read( - "D,/vlr:vlr-catalog/vlr:vlr", + project.add_project("D,/vlr:vlr-catalog/vlr:vlr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -498,10 +513,12 @@ class VirtualLinkRecord(object): return vlr_obj - def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id): + def __init__(self, dts, log, loop, project, nsr_name, vld_msg, + cloud_account_name, om_datacenter, ip_profile, nsr_id): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_name = nsr_name self._vld_msg = vld_msg self._cloud_account_name = cloud_account_name @@ -517,7 +534,8 @@ class VirtualLinkRecord(object): @property def xpath(self): """ path for this object """ - return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id) + return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']". + format(self._vlr_id)) @property def id(self): @@ -615,7 +633,7 @@ class VirtualLinkRecord(object): vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict() vlr_dict.update(vld_copy_dict) - vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict) + vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict) return vlr def reset_id(self, vlr_id): @@ -623,7 +641,7 @@ class VirtualLinkRecord(object): def create_nsr_vlr_msg(self, vnfrs): """ The VLR message""" - nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr() + nsr_vlr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr() nsr_vlr.vlr_ref = self._vlr_id nsr_vlr.assigned_subnet = self.assigned_subnet nsr_vlr.cloud_account = self.cloud_account_name @@ -721,7 +739,7 @@ class VirtualNetworkFunctionRecord(object): @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name, + def create_record(dts, log, loop, project, vnfd, const_vnfd_msg, nsd_id, nsr_name, cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id, placement_groups, restart_mode=False): """Creates a new VNFR object based on the given data. @@ -736,6 +754,7 @@ class VirtualNetworkFunctionRecord(object): dts, log, loop, + project, vnfd, const_vnfd_msg, nsd_id, @@ -750,7 +769,7 @@ class VirtualNetworkFunctionRecord(object): if restart_mode: res_iter = yield from dts.query_read( - "D,/vnfr:vnfr-catalog/vnfr:vnfr", + project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -767,6 +786,7 @@ class VirtualNetworkFunctionRecord(object): dts, log, loop, + project, vnfd, const_vnfd_msg, nsd_id, @@ -781,6 +801,7 @@ class VirtualNetworkFunctionRecord(object): self._dts = dts self._log = log self._loop = loop + self._project = project self._vnfd = vnfd self._const_vnfd_msg = const_vnfd_msg self._nsd_id = nsd_id @@ -820,7 +841,8 @@ class VirtualNetworkFunctionRecord(object): @property def xpath(self): """ VNFR xpath """ - return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id) + return self._project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']" + .format(self.id)) @property def vnfr_msg(self): @@ -830,7 +852,9 @@ class VirtualNetworkFunctionRecord(object): @property def const_vnfr_msg(self): """ VNFR message """ - return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name) + return RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef( + vnfr_id=self.id, cloud_account=self.cloud_account_name, + om_datacenter=self._om_datacenter_name) @property def vnfd(self): @@ -896,7 +920,8 @@ class VirtualNetworkFunctionRecord(object): @staticmethod def vnfr_xpath(vnfr): """ Get the VNFR path from VNFR """ - return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id) + return (self._project.add_project(VirtualNetworkFunctionRecord.XPATH) + + "[vnfr:id = '{}']").format(vnfr.id) @property def config_type(self): @@ -956,9 +981,10 @@ class VirtualNetworkFunctionRecord(object): } vnfr_dict.update(vnfd_copy_dict) - vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) - vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), - ignore_missing_keys=True) + vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd. \ + from_dict(self.vnfd.as_dict(), + ignore_missing_keys=True) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -981,7 +1007,7 @@ class VirtualNetworkFunctionRecord(object): format(self.name, self.vnfr_msg)) yield from self._dts.query_update( self.xpath, - rwdts.XactFlag.TRACE, + 0, #rwdts.XactFlag.TRACE, self.vnfr_msg ) @@ -1068,8 +1094,10 @@ class VirtualNetworkFunctionRecord(object): return None # For every connection point in the VNFD fill in the identifier + self._log.debug("Add connection point for VNF %s: %s", + self.vnfr_msg.name, self._vnfd.connection_point) for conn_p in self._vnfd.connection_point: - cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() + cpr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang if conn_p.has_field('port_security_enabled'): @@ -1099,9 +1127,6 @@ class VirtualNetworkFunctionRecord(object): self._log.info("Created VNF with xpath %s and vnfr %s", self.xpath, self.vnfr_msg) - self._log.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s", - self.xpath, self._vnfd, self.vnfr_msg) - @asyncio.coroutine def update_state(self, vnfr_msg): """ Update this VNFR""" @@ -1222,7 +1247,7 @@ class NetworkServiceStatus(object): event_list = [] idx = 1 for entry in self._events: - event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents() + event = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents() event.id = idx idx += 1 event.timestamp, event.event, event.description, event.details = entry @@ -1234,7 +1259,8 @@ class NetworkServiceRecord(object): """ Network service record """ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False, + def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, + sdn_account_name, key_pairs, project, restart_mode=False, vlr_handler=None): self._dts = dts self._log = log @@ -1244,6 +1270,7 @@ class NetworkServiceRecord(object): self._nsm_plugin = nsm_plugin self._sdn_account_name = sdn_account_name self._vlr_handler = vlr_handler + self._project = project self._nsd = None self._nsr_msg = None @@ -1277,7 +1304,7 @@ class NetworkServiceRecord(object): self.set_state(NetworkServiceRecordState.INIT) - self.substitute_input_parameters = InputParameterSubstitution(self._log) + self.substitute_input_parameters = InputParameterSubstitution(self._log, self._project) @property def nsm_plugin(self): @@ -1393,7 +1420,7 @@ class NetworkServiceRecord(object): for group_info in self._nsr_cfg_msg.nsd_placement_group_maps: if group_info.placement_group_ref == input_group.name: - group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo() + group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo() group_dict = {k:v for k,v in group_info.as_dict().items() if k != 'placement_group_ref'} for param in copy_dict: @@ -1416,7 +1443,7 @@ class NetworkServiceRecord(object): """ Fetch Cloud Account for the passed vnfd id """ if self._nsr_cfg_msg.vnf_cloud_account_map: vim_accounts = [(vnf.cloud_account,vnf.om_datacenter) for vnf in self._nsr_cfg_msg.vnf_cloud_account_map \ - if vnfd_member_index == vnf.member_vnf_index_ref] + if str(vnfd_member_index) == vnf.member_vnf_index_ref] if vim_accounts and vim_accounts[0]: return vim_accounts[0] return (self.cloud_account_name,self.om_datacenter_name) @@ -1465,14 +1492,16 @@ class NetworkServiceRecord(object): def vlr_uptime_update(self, vlr): try: - vlr_ = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict({'id': vlr.id}) + vlr_ = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict({'id': vlr.id}) while True: vlr_.uptime = int(time.time()) - vlr._create_time - yield from self._vlr_handler.update(None, VirtualLinkRecord.vlr_xpath(vlr), vlr_) + xpath = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) + yield from self._vlr_handler.update(None, xpath, vlr_) yield from asyncio.sleep(2, loop=self._loop) except asyncio.CancelledError: self._log.debug("Received cancellation request for vlr_uptime_update task") - yield from self._vlr_handler.delete(None, VirtualLinkRecord.vlr_xpath(vlr)) + xpath = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) + yield from self._vlr_handler.delete(None, xpath) @asyncio.coroutine @@ -1850,6 +1879,7 @@ class NetworkServiceRecord(object): self._dts, self._log, self._loop, + self._project, self.name, vld, cloud_account, @@ -2008,7 +2038,7 @@ class NetworkServiceRecord(object): for group in self.nsd_msg.placement_groups: for member_vnfd in group.member_vnfd: if (member_vnfd.vnfd_id_ref == vnfd_msg.id) and \ - (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): + (member_vnfd.member_vnf_index_ref == str(const_vnfd.member_vnf_index)): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) @@ -2034,6 +2064,7 @@ class NetworkServiceRecord(object): vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts, self._log, self._loop, + self._project, vnfd_msg, const_vnfd, self.nsd_id, @@ -2189,23 +2220,23 @@ class NetworkServiceRecord(object): @property def nsr_xpath(self): """ Returns the xpath associated with this NSR """ - return( + return self._project.add_project(( "D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" - ).format(self.id) + ).format(self.id)) @staticmethod def xpath_from_nsr(nsr): """ Returns the xpath associated with this NSR op data""" - return (NetworkServiceRecord.XPATH + - "[nsr:ns-instance-config-ref = '{}']").format(nsr.id) + return self._project.add_project((NetworkServiceRecord.XPATH + + "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)) @property def nsd_xpath(self): """ Return NSD config xpath.""" - return( - "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']" - ).format(self.nsd_id) + return self._project.add_project(( + "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}']" + ).format(self.nsd_id)) @asyncio.coroutine def instantiate(self, config_xact): @@ -2487,7 +2518,7 @@ class NetworkServiceRecord(object): def create_msg(self): """ The network serice record as a message """ nsr_dict = {"ns_instance_config_ref": self.id} - nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict) + nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict) #nsr.cloud_account = self.cloud_account_name nsr.sdn_account = self._sdn_account_name nsr.name_ref = self.name @@ -2501,7 +2532,7 @@ class NetworkServiceRecord(object): nsr.uptime = int(time.time()) - self._create_time for cfg_prim in self.nsd_msg.service_primitive: - cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( + cfg_prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( cfg_prim.as_dict()) nsr.service_primitive.append(cfg_prim) @@ -2641,7 +2672,7 @@ class InputParameterSubstitution(object): This class is responsible for substituting input parameters into an NSD. """ - def __init__(self, log): + def __init__(self, log, project): """Create an instance of InputParameterSubstitution Arguments: @@ -2649,6 +2680,27 @@ class InputParameterSubstitution(object): """ self.log = log + self.project = project + + def _fix_xpath(self, xpath): + # Fix the parameter.xpath to include project and correct namespace + self.log.error("Provided xpath: {}".format(xpath)) + #Split the xpath at the / + attrs = xpath.split('/') + new_xp = attrs[0] + for attr in attrs[1:]: + new_ns = 'project-nsd' + name = attr + if ':' in attr: + # Includes namespace + ns, name = attr.split(':', 2) + if ns == "rw-nsd": + ns = "rw-project-nsd" + + new_xp = new_xp + '/' + new_ns + ':' + name + + self.log.error("Updated xpath: {}".format(new_xp)) + return new_xp def __call__(self, nsd, nsr_config): """Substitutes input parameters from the NSR config into the NSD @@ -2686,7 +2738,8 @@ class InputParameterSubstitution(object): ) try: - xpath.setxattr(nsd, param.xpath, param.value) + xp = self._fix_xpath(param.xpath) + xpath.setxattr(nsd, xp, param.value) except Exception as e: self.log.exception(e) @@ -2747,7 +2800,9 @@ class NetworkServiceDescriptor(object): @staticmethod def path_for_id(nsd_id): """ Return path for the passed nsd_id""" - return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id) + return self._nsm._project.add_project( + "C,/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id = '{}'". + format(nsd_id)) def path(self): """ Return the message associated with this NetworkServiceDescriptor""" @@ -2760,7 +2815,7 @@ class NetworkServiceDescriptor(object): class NsdDtsHandler(object): """ The network service descriptor DTS handler """ - XPATH = "C,/nsd:nsd-catalog/nsd:nsd" + XPATH = "C,/project-nsd:nsd-catalog/project-nsd:nsd" def __init__(self, dts, log, loop, nsm): self._dts = dts @@ -2769,6 +2824,7 @@ class NsdDtsHandler(object): self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2779,16 +2835,27 @@ class NsdDtsHandler(object): def register(self): """ Register for Nsd create/update/delete/read requests from dts """ + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL self._log.debug("Got nsd apply cfg (xact:%s) (action:%s)", xact, action) - # Create/Update an NSD record - for cfg in self._regh.get_xact_elements(xact): - # Only interested in those NSD cfgs whose ID was received in prepare callback - if cfg.id in scratch.get('nsds', []) or is_recovery: - self._nsm.update_nsd(cfg) + + if self._regh: + # Create/Update an NSD record + for cfg in self._regh.get_xact_elements(xact): + # Only interested in those NSD cfgs whose ID was received in prepare callback + if cfg.id in scratch.get('nsds', []) or is_recovery: + self._nsm.update_nsd(cfg) + + else: + self._log.error("No reg handle for {} for project {}". + format(self.__class__, self._project.name)) scratch.pop('nsds', None) @@ -2806,7 +2873,7 @@ class NsdDtsHandler(object): except Exception as e: self._log.error("Exception in cleaning up NSD libs {}: {}". format(nsd_id, e)) - self._log.excpetion(e) + self._log.exception(e) @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): @@ -2846,14 +2913,21 @@ class NsdDtsHandler(object): # Need a list in scratch to store NSDs to create/update later # acg._scratch['nsds'] = list() self._regh = acg.register( - xpath=NsdDtsHandler.XPATH, + xpath=self._project.add_project(NsdDtsHandler.XPATH), flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register NSD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class VnfdDtsHandler(object): """ DTS handler for VNFD config changes """ - XPATH = "C,/vnfd:vnfd-catalog/vnfd:vnfd" + XPATH = "C,/project-vnfd:vnfd-catalog/project-vnfd:vnfd" def __init__(self, dts, log, loop, nsm): self._dts = dts @@ -2861,6 +2935,7 @@ class VnfdDtsHandler(object): self._loop = loop self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2871,21 +2946,31 @@ class VnfdDtsHandler(object): def register(self): """ Register for VNFD configuration""" + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" self._log.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) - # Create/Update a VNFD record - for cfg in self._regh.get_xact_elements(xact): - # Only interested in those VNFD cfgs whose ID was received in prepare callback - if cfg.id in scratch.get('vnfds', []): - self._nsm.update_vnfd(cfg) + if self._regh: + # Create/Update a VNFD record + for cfg in self._regh.get_xact_elements(xact): + # Only interested in those VNFD cfgs whose ID was received in prepare callback + if cfg.id in scratch.get('vnfds', []): + self._nsm.update_vnfd(cfg) - for cfg in self._regh.elements: - if cfg.id in scratch.get('deleted_vnfds', []): - yield from self._nsm.delete_vnfd(cfg.id) + for cfg in self._regh.elements: + if cfg.id in scratch.get('deleted_vnfds', []): + yield from self._nsm.delete_vnfd(cfg.id) + + else: + self._log.error("Reg handle none for {} in project {}". + format(self.__class__, self._project)) scratch.pop('vnfds', None) scratch.pop('deleted_vnfds', None) @@ -2911,20 +2996,28 @@ class VnfdDtsHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) + xpath = self._project.add_project(VnfdDtsHandler.XPATH) self._log.debug( - "Registering for VNFD config using xpath: %s", - VnfdDtsHandler.XPATH, - ) + "Registering for VNFD config using xpath {} for project {}" + .format(xpath, self._project)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: # Need a list in scratch to store VNFDs to create/update later # acg._scratch['vnfds'] = list() # acg._scratch['deleted_vnfds'] = list() self._regh = acg.register( - xpath=VnfdDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register VNFD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + + class NsrRpcDtsHandler(object): """ The network service instantiation RPC DTS handler """ EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service" @@ -2946,7 +3039,9 @@ class NsrRpcDtsHandler(object): self._ns_regh = None self._manager = None - self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config' + self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + \ + 'config/project/{}/ns-instance-config'. \ + format(self._nsm._project.name) self._model = RwYang.Model.create_libncx() self._model.load_schema_ypbc(RwNsrYang.get_schema()) @@ -2994,24 +3089,39 @@ class NsrRpcDtsHandler(object): def _apply_ns_instance_config(self,payload_dict): #self._log.debug("At apply NS instance config with payload %s",payload_dict) - req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'} - response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False) + req_hdr= {'accept':'application/vnd.yang.data+json', + 'content-type':'application/vnd.yang.data+json'} + response=requests.post(self._nsr_config_url, headers=req_hdr, + auth=('admin', 'admin'),data=payload_dict,verify=False) return response @asyncio.coroutine def register(self): """ Register for NS monitoring read from dts """ + if self._ns_regh: + self._log.warning("RPC already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_ns_config_prepare(xact_info, action, ks_path, msg): """ prepare callback from dts start-network-service""" assert action == rwdts.QueryAction.RPC rpc_ip = msg + + if not self._nsm._project.rpc_check(msg, xact_info=xact_info): + return + rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ - "nsr_id":str(uuid.uuid4()) - }) + "nsr_id":str(uuid.uuid4()), + "project_name": msg.prject_name, + }) - if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): - self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and + ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): + self._log.error("Mandatory parameters name or nsd_ref or " + + "cloud account not found in start-network-service {}". + format(rpc_ip)) self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) @@ -3030,11 +3140,11 @@ class NsrRpcDtsHandler(object): ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"} ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items() - if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields} + if k in RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields} ns_instance_config_dict.update(ns_instance_config_copy_dict) - ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) - ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd() + ns_instance_config = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) + ns_instance_config.nsd = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd() ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict()) payload_dict = ns_instance_config.to_json(self._model) @@ -3077,6 +3187,13 @@ class NsrRpcDtsHandler(object): flags=rwdts.Flag.PUBLISHER, ) + def deregister(self): + self._log.debug("De-register NSR RPC for project {}". + format(self._nsm._project.name)) + if self._ns_regh: + self._ns_regh.deregister() + self._ns_regh = None + class NsrDtsHandler(object): """ The network service DTS handler """ @@ -3089,6 +3206,7 @@ class NsrDtsHandler(object): self._log = log self._loop = loop self._nsm = nsm + self._project = self._nsm._project self._nsr_regh = None self._scale_regh = None @@ -3103,13 +3221,18 @@ class NsrDtsHandler(object): def register(self): """ Register for Nsr create/update/delete/read requests from dts """ + if self._nsr_regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def nsr_id_from_keyspec(ks): - nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) + nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) nsr_id = nsr_path_entry.key00.id return nsr_id def group_name_from_keyspec(ks): - group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) + group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) group_name = group_path_entry.key00.scaling_group_name_ref return group_name @@ -3200,32 +3323,6 @@ class NsrDtsHandler(object): for vld in vl_delta["deleted"]: yield from self._nsm.nsr_terminate_vl(nsr_id, vld) - def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch): - # Unfortunately, it is currently difficult to figure out what has exactly - # changed in this xact without Pbdelta support (RIFT-4916) - # As a workaround, we can fetch the pre and post xact elements and - # perform a comparison to figure out adds/deletes/updates - xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) - curr_cfgs = list(dts_member_reg.elements) - - xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} - curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} - - # Find Adds - added_keys = set(xact_key_map) - set(curr_key_map) - added_cfgs = [xact_key_map[key] for key in added_keys] - - # Find Deletes - deleted_keys = set(curr_key_map) - set(xact_key_map) - deleted_cfgs = [curr_key_map[key] for key in deleted_keys] - - # Find Updates - updated_keys = set(curr_key_map) & set(xact_key_map) - updated_cfgs = [xact_key_map[key] for key in updated_keys - if xact_key_map[key] != curr_key_map[key]] - - return added_cfgs, deleted_cfgs, updated_cfgs - def get_nsr_key_pairs(dts_member_reg, xact): key_pairs = {} for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True): @@ -3274,24 +3371,36 @@ class NsrDtsHandler(object): def begin_instantiation(nsr): # Begin instantiation self._log.info("Beginning NS instantiation: %s", nsr.id) - yield from self._nsm.instantiate_ns(nsr.id, xact) + try: + yield from self._nsm.instantiate_ns(nsr.id, xact) + except Exception as e: + self._log.exception("NS instantiation: {}".format(e)) + raise e self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) if action == rwdts.AppconfAction.INSTALL and xact.id is None: key_pairs = [] - for element in self._key_pair_regh.elements: - key_pairs.append(element) - for element in self._nsr_regh.elements: - nsr = handle_create_nsr(element, key_pairs, restart_mode=True) - self._loop.create_task(begin_instantiation(nsr)) + if self._key_pair_regh: + for element in self._key_pair_regh.elements: + key_pairs.append(element) + else: + self._log.error("Reg handle none for key pair in project {}". + format(self._project)) + + if self._nsr_regh: + for element in self._nsr_regh.elements: + nsr = handle_create_nsr(element, key_pairs, restart_mode=True) + self._loop.create_task(begin_instantiation(nsr)) + else: + self._log.error("Reg handle none for NSR in project {}". + format(self._project)) (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, - "id", - scratch) + "id") self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs, deleted_msgs, updated_msgs) @@ -3402,24 +3511,40 @@ class NsrDtsHandler(object): acg.handle.prepare_complete_ok(xact_info.handle) - self._log.debug("Registering for NSR config using xpath: %s", - NsrDtsHandler.NSR_XPATH) + xpath = self._project.add_project(NsrDtsHandler.NSR_XPATH) + self._log.debug("Registering for NSR config using xpath: {}". + format(xpath)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: - self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - on_prepare=on_prepare) + self._nsr_regh = acg.register( + xpath=xpath, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + on_prepare=on_prepare + ) self._scale_regh = acg.register( - xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, - ) + xpath=self._project.add_project(NsrDtsHandler.SCALE_INSTANCE_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, + ) self._key_pair_regh = acg.register( - xpath=NsrDtsHandler.KEY_PAIR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - ) + xpath=self._project.add_project(NsrDtsHandler.KEY_PAIR_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + ) + + def deregister(self): + self._log.debug("De-register NSR config for project {}". + format(self._project.name)) + if self._nsr_regh: + self._nsr_regh.deregister() + self._nsr_regh = None + if self._scale_regh: + self._scale_regh.deregister() + self._scale_regh = None + if self._key_pair_regh: + self._key_pair_regh.deregister() + self._key_pair_regh = None class NsrOpDataDtsHandler(object): @@ -3431,6 +3556,8 @@ class NsrOpDataDtsHandler(object): self._log = log self._loop = loop self._nsm = nsm + + self._project = nsm._project self._regh = None @property @@ -3446,39 +3573,55 @@ class NsrOpDataDtsHandler(object): @asyncio.coroutine def register(self): """ Register for Nsr op data publisher registration""" - self._log.debug("Registering Nsr op data path %s as publisher", - NsrOpDataDtsHandler.XPATH) + if self._regh: + self._log.warning("NSR op data handler already registered for project {}". + format(self._project.name)) + return + + xpath = self._project.add_project(NsrOpDataDtsHandler.XPATH) + self._log.debug("Registering Nsr op data path {} as publisher". + format(xpath)) hdl = rift.tasklets.DTS.RegistrationHandler() handlers = rift.tasklets.Group.Handler() with self._dts.group_create(handler=handlers) as group: - self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH, + self._regh = group.register(xpath=xpath, handler=hdl, flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE) + def deregister(self): + self._log.debug("De-register NSR opdata for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine - def create(self, path, msg): + def create(self, xpath, msg): """ Create an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Creating NSR %s:%s", path, msg) self.regh.create_element(path, msg) self._log.debug("Created NSR, %s:%s", path, msg) @asyncio.coroutine - def update(self, path, msg, flags=rwdts.XactFlag.REPLACE): + def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE): """ Update an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh) self.regh.update_element(path, msg, flags) self._log.debug("Updated NSR, %s:%s", path, msg) @asyncio.coroutine - def delete(self, path): + def delete(self, xpath): """ Update an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Deleting NSR path:%s", path) self.regh.delete_element(path) self._log.debug("Deleted NSR path:%s", path) @@ -3509,6 +3652,11 @@ class VnfrDtsHandler(object): @asyncio.coroutine def register(self): """ Register for vnfr create/update/delete/ advises from dts """ + if self._regh: + self._log.warning("VNFR DTS handler already registered for project {}". + format(self._project.name)) + return + def on_commit(xact_info): """ The transaction has been committed """ @@ -3524,16 +3672,17 @@ class VnfrDtsHandler(object): xact_info, action, ks_path, msg ) - schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() + schema = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) - if path_entry.key00.id not in self._nsm._vnfrs: - self._log.error("%s request for non existent record path %s", - action, xpath) + if not path_entry or (path_entry.key00.id not in self._nsm._vnfrs): + # Check if this is a monitoring param xpath + if 'vnfr:monitoring-param' not in xpath: + self._log.error("%s request for non existent record path %s", + action, xpath) xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath) return - self._log.debug("Deleting VNFR with id %s", path_entry.key00.id) if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE: yield from self._nsm.update_vnfr(msg) elif action == rwdts.QueryAction.DELETE: @@ -3548,10 +3697,17 @@ class VnfrDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit, on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=VnfrDtsHandler.XPATH, + self._regh = group.register(xpath=self._nsm._project.add_project( + VnfrDtsHandler.XPATH), handler=hdl, flags=(rwdts.Flag.SUBSCRIBER),) + def deregister(self): + self._log.debug("De-register VNFR for project {}". + format(self._nsm._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None class NsdRefCountDtsHandler(object): """ The NSD Ref Count DTS handler """ @@ -3578,6 +3734,11 @@ class NsdRefCountDtsHandler(object): @asyncio.coroutine def register(self): """ Register for NSD ref count read from dts """ + if self._regh: + self._log.warning("NSD ref DTS handler already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): @@ -3585,7 +3746,7 @@ class NsdRefCountDtsHandler(object): xpath = ks_path.to_xpath(RwNsrYang.get_schema()) if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema() + schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount.schema() path_entry = schema.keyspec_to_entry(ks_path) nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref) for xpath, msg in nsd_list: @@ -3598,19 +3759,28 @@ class NsdRefCountDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH, + self._regh = group.register(xpath=self._nsm._project.add_project( + NsdRefCountDtsHandler.XPATH), handler=hdl, flags=rwdts.Flag.PUBLISHER,) + def deregister(self): + self._log.debug("De-register NSD Ref count for project {}". + format(self._nsm._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class NsManager(object): """ The Network Service Manager class""" - def __init__(self, dts, log, loop, + def __init__(self, dts, log, loop, project, nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector, vnffgmgr, vnfd_pub_handler, cloud_account_handler): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_handler = nsr_handler self._vnfr_pub_handler = vnfr_handler self._vlr_pub_handler = vlr_handler @@ -3641,8 +3811,9 @@ class NsManager(object): VnfrDtsHandler(dts, log, loop, self), NsdRefCountDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), - ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), - NsrRpcDtsHandler(dts,log,loop,self), + ScalingRpcHandler(log, dts, loop, self._project, + self.scale_rpc_callback), + NsrRpcDtsHandler(dts, log, loop, self), self._vnfd_dts_handler, self.cfgmgr_obj, ] @@ -3718,6 +3889,11 @@ class NsManager(object): for dts_handle in self._dts_handlers: yield from dts_handle.register() + def deregister(self): + """ Register all static DTS handlers """ + for dts_handle in self._dts_handlers: + dts_handle.deregister() + def get_ns_by_nsr_id(self, nsr_id): """ get NSR by nsr id """ @@ -3757,12 +3933,16 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup + ScalingGroupInstance = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance + ScalingGroup = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup + + xpath = self._project.add_project( + ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]'). + format(msg.nsr_id_ref)) - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) + instance = ScalingGroupInstance.from_dict({ + "id": msg.instance_id, + "project_name": self._project.name,}) @asyncio.coroutine def get_nsr_scaling_group(): @@ -3863,6 +4043,7 @@ class NsManager(object): nsr_msg, sdn_account_name, key_pairs, + self._project, restart_mode=restart_mode, vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr ) @@ -3921,7 +4102,7 @@ class NsManager(object): @asyncio.coroutine def get_nsr_config(self, nsd_id): - xpath = "C,/nsr:ns-instance-config" + xpath = self._project.add_project("C,/nsr:ns-instance-config") results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) for result in results: @@ -4100,18 +4281,18 @@ class NsManager(object): def nsd_refcount_xpath(nsd_id): """ xpath for ref count entry """ - return (NsdRefCountDtsHandler.XPATH + + return (self._project.add_project(NsdRefCountDtsHandler.XPATH) + "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id) nsd_list = [] if nsd_id is None or nsd_id == "": for nsd in self._nsds.values(): - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() + nsd_msg = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount() nsd_msg.nsd_id_ref = nsd.id nsd_msg.instance_ref_count = nsd.ref_count nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg)) elif nsd_id in self._nsds: - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() + nsd_msg = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount() nsd_msg.nsd_id_ref = self._nsds[nsd_id].id nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg)) @@ -4147,10 +4328,12 @@ class NsmRecordsPublisherProxy(object): """ This class provides a publisher interface that allows plugin objects to publish NSR/VNFR/VLR""" - def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr): + def __init__(self, dts, log, loop, project, nsr_pub_hdlr, + vnfr_pub_hdlr, vlr_pub_hdlr,): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_pub_hdlr = nsr_pub_hdlr self._vlr_pub_hdlr = vlr_pub_hdlr self._vnfr_pub_hdlr = vnfr_pub_hdlr @@ -4182,13 +4365,13 @@ class NsmRecordsPublisherProxy(object): @asyncio.coroutine def publish_vlr(self, xact, vlr): """ Publish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.update(xact, path, vlr)) @asyncio.coroutine def unpublish_vlr(self, xact, vlr): """ Unpublish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.delete(xact, path)) @@ -4202,24 +4385,35 @@ class ScalingRpcHandler(mano_dts.DtsHandler): ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT') - def __init__(self, log, dts, loop, callback=None): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, project, callback=None): + super().__init__(log, dts, loop, project) self.callback = callback self.last_instance_id = defaultdict(int) + self._regh_in = None + self._regh_out = None @asyncio.coroutine def register(self): + if self._regh_in: + self._log.warning("RPC already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_scale_in_prepare(xact_info, action, ks_path, msg): assert action == rwdts.QueryAction.RPC try: + if not self._project.rpc_check(msg, xact_info=xact_info): + return + if self.callback: self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ - "instance_id": msg.instance_id}) + "instance_id": msg.instance_id, + "project_name": self._project.name,}) xact_info.respond_xpath( rwdts.XactRspCode.ACK, @@ -4237,6 +4431,9 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: + if not self._project.rpc_check(msg, xact_info=xact_info): + return + scaling_group = msg.scaling_group_name_ref if not msg.instance_id: last_instance_id = self.last_instance_id[scale_group] @@ -4247,7 +4444,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ - "instance_id": msg.instance_id}) + "instance_id": msg.instance_id, + "project_name": self._project.name,}) xact_info.respond_xpath( rwdts.XactRspCode.ACK, @@ -4266,32 +4464,38 @@ class ScalingRpcHandler(mano_dts.DtsHandler): on_prepare=on_scale_out_prepare) with self.dts.group_create() as group: - group.register( - xpath=self.__class__.SCALE_IN_INPUT_XPATH, - handler=scale_in_hdl, - flags=rwdts.Flag.PUBLISHER) - group.register( - xpath=self.__class__.SCALE_OUT_INPUT_XPATH, - handler=scale_out_hdl, - flags=rwdts.Flag.PUBLISHER) - + self._regh_in = group.register( + xpath=self.__class__.SCALE_IN_INPUT_XPATH, + handler=scale_in_hdl, + flags=rwdts.Flag.PUBLISHER) + self._regh_out = group.register( + xpath=self.__class__.SCALE_OUT_INPUT_XPATH, + handler=scale_out_hdl, + flags=rwdts.Flag.PUBLISHER) + + def deregister(self): + self._log.debug("De-register scale RPCs for project {}". + format(self._project.name)) + if self._regh_in: + self._regh_in.deregister() + self._regh_in = None + if self._regh_out: + self._regh_out.deregister() + self._regh_out = None + + +class NsmProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(NsmProject, self).__init__(tasklet.log, name) + self.update(tasklet) -class NsmTasklet(rift.tasklets.Tasklet): - """ - The network service manager tasklet - """ - def __init__(self, *args, **kwargs): - super(NsmTasklet, self).__init__(*args, **kwargs) - self.rwlog.set_category("rw-mano-log") - self.rwlog.set_subcategory("nsm") - - self._dts = None self._nsm = None self._ro_plugin_selector = None self._vnffgmgr = None - self._nsr_handler = None + self._nsr_pub_handler = None self._vnfr_pub_handler = None self._vlr_pub_handler = None self._vnfd_pub_handler = None @@ -4299,57 +4503,33 @@ class NsmTasklet(rift.tasklets.Tasklet): self._records_publisher_proxy = None - def start(self): - """ The task start callback """ - super(NsmTasklet, self).start() - self.log.info("Starting NsmTasklet") - - self.log.debug("Registering with dts") - self._dts = rift.tasklets.DTS(self.tasklet_info, - RwNsmYang.get_schema(), - self.loop, - self.on_dts_state_change) - - self.log.debug("Created DTS Api GI Object: %s", self._dts) - - def stop(self): - try: - self._dts.deinit() - except Exception: - print("Caught Exception in NSM stop:", sys.exc_info()[0]) - raise - - def on_instance_started(self): - """ Task instance started callback """ - self.log.debug("Got instance started callback") - @asyncio.coroutine - def init(self): - """ Task init callback """ - self.log.debug("Got instance started callback") - - self.log.debug("creating config account handler") - - self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop) + def register(self): + self._nsr_pub_handler = publisher.NsrOpDataDtsHandler( + self._dts, self.log, self.loop, self) yield from self._nsr_pub_handler.register() - self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vnfr_pub_handler.register() - self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vlr_pub_handler = publisher.VlrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vlr_pub_handler.register() - manifest = self.tasklet_info.get_pb_manifest() + manifest = self._tasklet.tasklet_info.get_pb_manifest() use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl ssl_cert = manifest.bootstrap_phase.rwsecurity.cert ssl_key = manifest.bootstrap_phase.rwsecurity.key - self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop) + self._vnfd_pub_handler = publisher.VnfdPublisher( + use_ssl, ssl_cert, ssl_key, self.loop, self) self._records_publisher_proxy = NsmRecordsPublisherProxy( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, @@ -4361,6 +4541,7 @@ class NsmTasklet(rift.tasklets.Tasklet): self._dts, self.log, self.loop, + self, self._records_publisher_proxy, ) yield from self._ro_plugin_selector.register() @@ -4368,28 +4549,99 @@ class NsmTasklet(rift.tasklets.Tasklet): self._cloud_account_handler = cloud.CloudAccountConfigSubscriber( self._log, self._dts, - self.log_hdl) + self.log_hdl, + self, + ) yield from self._cloud_account_handler.register() - self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop) + self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop, self) yield from self._vnffgmgr.register() self._nsm = NsManager( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, self._ro_plugin_selector, self._vnffgmgr, self._vnfd_pub_handler, - self._cloud_account_handler + self._cloud_account_handler, ) yield from self._nsm.register() + def deregister(self): + self._log.debug("Project {} de-register".format(self.name)) + self._nsm.deregister() + self._vnffgmgr.deregister() + self._cloud_account_handler.deregister() + self._ro_plugin_selector.deregister() + self._nsm = None + + @asyncio.coroutine + def delete_prepare(self): + # Check if any NS instance is present + if self._nsm and self._nsm._nsrs: + return False + return True + + +class NsmTasklet(rift.tasklets.Tasklet): + """ + The network service manager tasklet + """ + def __init__(self, *args, **kwargs): + super(NsmTasklet, self).__init__(*args, **kwargs) + self.rwlog.set_category("rw-mano-log") + self.rwlog.set_subcategory("nsm") + + self._dts = None + self.project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts + + def start(self): + """ The task start callback """ + super(NsmTasklet, self).start() + self.log.info("Starting NsmTasklet") + + self.log.debug("Registering with dts") + self._dts = rift.tasklets.DTS(self.tasklet_info, + RwNsmYang.get_schema(), + self.loop, + self.on_dts_state_change) + + self.log.debug("Created DTS Api GI Object: %s", self._dts) + + def stop(self): + try: + self._dts.deinit() + except Exception: + print("Caught Exception in NSM stop:", sys.exc_info()[0]) + raise + + def on_instance_started(self): + """ Task instance started callback """ + self.log.debug("Got instance started callback") + + @asyncio.coroutine + def init(self): + """ Task init callback """ + self.log.debug("Got instance started callback") + + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, NsmProject) + self.project_handler.register() + + + @asyncio.coroutine def run(self): """ Task run callback """