X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=0797b5583216e2e047d667ac53b6592716cc19c1;hb=f49375710db1acf3cd74c8651d098b7a08e8d0b2;hp=9877ae9ca11725588ce4d10c1a5e13a4b783ae73;hpb=d0d7b8a40207d7c67e72c65879226c441e443d05;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 9877ae9c..0797b558 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -1,4 +1,4 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -61,6 +61,12 @@ import rift.tasklets import rift.mano.ncclient import rift.mano.config_data.config import rift.mano.dts as mano_dts +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, + get_add_delete_update_cfgs, + DEFAULT_PROJECT, + ) from . import rwnsm_conman as conman from . import cloud @@ -220,22 +226,20 @@ class VnffgRecord(object): if self._vnffgr_state == VnffgRecordState.INIT: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, "operational_status": 'init', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) elif self._vnffgr_state == VnffgRecordState.TERMINATED: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, "operational_status": 'terminated', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) else: try: vnffgr = self._vnffgmgr.fetch_vnffgr(self._vnffgr_id) @@ -243,13 +247,12 @@ class VnffgRecord(object): self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id) self._vnffgr_state = VnffgRecordState.FAILED vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, "operational_status": 'failed', } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) return vnffgr @@ -257,12 +260,11 @@ class VnffgRecord(object): def vnffgr_create_msg(self): """ Virtual Link Record message for Creating VLR in VNS """ vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, } - vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) + vnffgr = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) for rsp in self._vnffgd_msg.rsp: vnffgr_rsp = vnffgr.rsp.add() vnffgr_rsp.id = str(uuid.uuid4()) @@ -274,9 +276,11 @@ class VnffgRecord(object): vnfd = [vnfr.vnfd for vnfr in self._nsr.vnfrs.values() if vnfr.vnfd.id == rsp_cp_ref.vnfd_id_ref] self._log.debug("VNFD message during VNFFG instantiation is %s",vnfd) if len(vnfd) > 0 and vnfd[0].has_field('service_function_type'): - self._log.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) + self._log.debug("Service Function Type for VNFD ID %s is %s", + rsp_cp_ref.vnfd_id_ref, vnfd[0].service_function_type) else: - self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref.vnfd_id_ref) + self._log.error("Service Function Type not available for VNFD ID %s; Skipping in chain", + rsp_cp_ref.vnfd_id_ref) continue vnfr_cp_ref = vnffgr_rsp.vnfr_connection_point_ref.add() @@ -297,7 +301,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -324,7 +329,8 @@ class VnffgRecord(object): rsp_id_ref = _rsp[0].id rsp_name = _rsp[0].name else: - self._log.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) + self._log.error("RSP with ID %s not found during classifier creation for classifier id %s", + vnffgd_classifier.rsp_id_ref,vnffgd_classifier.id) continue vnffgr_classifier = vnffgr.classifier.add() vnffgr_classifier.id = vnffgd_classifier.id @@ -348,7 +354,8 @@ class VnffgRecord(object): self._log.info("Received vnf op status is %s; retrying",vnfr.operational_status) if vnfr.operational_status == 'failed': self._log.error("Fetching VNFR for %s failed", vnfr.id) - raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self.id, vnfr.id)) + raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % + (self.id, vnfr.id)) yield from asyncio.sleep(2, loop=self._loop) vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) @@ -361,8 +368,9 @@ class VnffgRecord(object): for ext_intf in vdu.external_interface: if ext_intf.name == vnffgr_classifier.vnfr_connection_point_ref: vnffgr_classifier.vm_id = vdu.vim_id - self._log.debug("VIM ID for CP %s in VNFR %s is %s",cp.name,nsr_vnfr.id, - vnfr_cp_ref.connection_point_params.vm_id) + self._log.debug("VIM ID for CP %s in VNFR %s is %s", + cp.name,nsr_vnfr.id, + vnfr_cp_ref.connection_point_params.vm_id) break self._log.info("VNFFGR msg to be sent is %s", vnffgr) @@ -460,9 +468,12 @@ class VnffgRecord(object): class VirtualLinkRecord(object): """ Virtual Link Records class""" + XPATH = "D,/vlr:vlr-catalog/vlr:vlr" @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False): + def create_record(dts, log, loop, project, nsr_name, vld_msg, + cloud_account_name, om_datacenter, ip_profile, + nsr_id, restart_mode=False): """Creates a new VLR object based on the given data. If restart mode is enabled, then we look for existing records in the @@ -475,6 +486,7 @@ class VirtualLinkRecord(object): dts, log, loop, + project, nsr_name, vld_msg, cloud_account_name, @@ -485,7 +497,7 @@ class VirtualLinkRecord(object): if restart_mode: res_iter = yield from dts.query_read( - "D,/vlr:vlr-catalog/vlr:vlr", + project.add_project("D,/vlr:vlr-catalog/vlr:vlr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -501,10 +513,12 @@ class VirtualLinkRecord(object): return vlr_obj - def __init__(self, dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id): + def __init__(self, dts, log, loop, project, nsr_name, vld_msg, + cloud_account_name, om_datacenter, ip_profile, nsr_id): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_name = nsr_name self._vld_msg = vld_msg self._cloud_account_name = cloud_account_name @@ -515,11 +529,13 @@ class VirtualLinkRecord(object): self._vlr_id = str(uuid.uuid4()) self._state = VlRecordState.INIT self._prev_state = None - + self._create_time = int(time.time()) + @property def xpath(self): """ path for this object """ - return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self._vlr_id) + return self._project.add_project("D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']". + format(self._vlr_id)) @property def id(self): @@ -608,6 +624,7 @@ class VirtualLinkRecord(object): "nsr_id_ref": self._nsr_id, "vld_ref": self.vld_msg.id, "name": self.name, + "create_time": self._create_time, "cloud_account": self.cloud_account_name, "om_datacenter": self.om_datacenter_name, } @@ -616,7 +633,7 @@ class VirtualLinkRecord(object): vlr_dict['ip_profile_params' ] = self._ip_profile.ip_profile_params.as_dict() vlr_dict.update(vld_copy_dict) - vlr = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict(vlr_dict) + vlr = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict(vlr_dict) return vlr def reset_id(self, vlr_id): @@ -624,7 +641,7 @@ class VirtualLinkRecord(object): def create_nsr_vlr_msg(self, vnfrs): """ The VLR message""" - nsr_vlr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr() + nsr_vlr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_Vlr() nsr_vlr.vlr_ref = self._vlr_id nsr_vlr.assigned_subnet = self.assigned_subnet nsr_vlr.cloud_account = self.cloud_account_name @@ -645,7 +662,6 @@ class VirtualLinkRecord(object): @asyncio.coroutine def instantiate(self): """ Instantiate this VL """ - self._log.debug("Instaniating VLR key %s, vld %s", self.xpath, self._vld_msg) vlr = None @@ -723,7 +739,7 @@ class VirtualNetworkFunctionRecord(object): @staticmethod @asyncio.coroutine - def create_record(dts, log, loop, vnfd, const_vnfd_msg, nsd_id, nsr_name, + def create_record(dts, log, loop, project, vnfd, const_vnfd_msg, nsd_id, nsr_name, cloud_account_name, om_datacenter_name, nsr_id, group_name, group_instance_id, placement_groups, restart_mode=False): """Creates a new VNFR object based on the given data. @@ -738,6 +754,7 @@ class VirtualNetworkFunctionRecord(object): dts, log, loop, + project, vnfd, const_vnfd_msg, nsd_id, @@ -752,7 +769,7 @@ class VirtualNetworkFunctionRecord(object): if restart_mode: res_iter = yield from dts.query_read( - "D,/vnfr:vnfr-catalog/vnfr:vnfr", + project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr"), rwdts.XactFlag.MERGE) for fut in res_iter: @@ -769,6 +786,7 @@ class VirtualNetworkFunctionRecord(object): dts, log, loop, + project, vnfd, const_vnfd_msg, nsd_id, @@ -783,6 +801,7 @@ class VirtualNetworkFunctionRecord(object): self._dts = dts self._log = log self._loop = loop + self._project = project self._vnfd = vnfd self._const_vnfd_msg = const_vnfd_msg self._nsd_id = nsd_id @@ -794,6 +813,7 @@ class VirtualNetworkFunctionRecord(object): self._group_instance_id = group_instance_id self._placement_groups = placement_groups self._config_status = NsrYang.ConfigStates.INIT + self._create_time = int(time.time()) self._prev_state = VnfRecordState.INIT self._state = VnfRecordState.INIT @@ -821,7 +841,8 @@ class VirtualNetworkFunctionRecord(object): @property def xpath(self): """ VNFR xpath """ - return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self.id) + return self._project.add_project("D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']" + .format(self.id)) @property def vnfr_msg(self): @@ -831,7 +852,9 @@ class VirtualNetworkFunctionRecord(object): @property def const_vnfr_msg(self): """ VNFR message """ - return RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id=self.id,cloud_account=self.cloud_account_name,om_datacenter=self._om_datacenter_name) + return RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ConstituentVnfrRef( + vnfr_id=self.id, cloud_account=self.cloud_account_name, + om_datacenter=self._om_datacenter_name) @property def vnfd(self): @@ -897,7 +920,8 @@ class VirtualNetworkFunctionRecord(object): @staticmethod def vnfr_xpath(vnfr): """ Get the VNFR path from VNFR """ - return (VirtualNetworkFunctionRecord.XPATH + "[vnfr:id = '{}']").format(vnfr.id) + return (self._project.add_project(VirtualNetworkFunctionRecord.XPATH) + + "[vnfr:id = '{}']").format(vnfr.id) @property def config_type(self): @@ -950,7 +974,6 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict = { "id": self.id, "nsr_id_ref": self._nsr_id, - "vnfd_ref": self.vnfd.id, "name": self.name, "cloud_account": self._cloud_account_name, "om_datacenter": self._om_datacenter_name, @@ -958,8 +981,11 @@ class VirtualNetworkFunctionRecord(object): } vnfr_dict.update(vnfd_copy_dict) - vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) - vnfr.member_vnf_index_ref = self.member_vnf_index + vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr.vnfd = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd. \ + from_dict(self.vnfd.as_dict(), + ignore_missing_keys=True) + vnfr.member_vnf_index_ref = str(self.member_vnf_index) vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) if self._vnfd.mgmt_interface.has_field("port"): @@ -981,7 +1007,7 @@ class VirtualNetworkFunctionRecord(object): format(self.name, self.vnfr_msg)) yield from self._dts.query_update( self.xpath, - rwdts.XactFlag.TRACE, + 0, #rwdts.XactFlag.TRACE, self.vnfr_msg ) @@ -1069,9 +1095,12 @@ class VirtualNetworkFunctionRecord(object): # For every connection point in the VNFD fill in the identifier for conn_p in self._vnfd.connection_point: - cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() + cpr = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang + if conn_p.has_field('port_security_enabled'): + cpr.port_security_enabled = conn_p.port_security_enabled + vlr_ref = find_vlr_for_cp(conn_p) if vlr_ref is None: msg = "Failed to find VLR for cp = %s" % conn_p.name @@ -1082,7 +1111,7 @@ class VirtualNetworkFunctionRecord(object): cpr.vlr_ref = vlr_ref.id self.vnfr_msg.connection_point.append(cpr) self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s", - cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref) + cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id) if not self.restart_mode: yield from self._dts.query_create(self.xpath, @@ -1219,7 +1248,7 @@ class NetworkServiceStatus(object): event_list = [] idx = 1 for entry in self._events: - event = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents() + event = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_OperationalEvents() event.id = idx idx += 1 event.timestamp, event.event, event.description, event.details = entry @@ -1231,7 +1260,9 @@ class NetworkServiceRecord(object): """ Network service record """ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False): + def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, + sdn_account_name, key_pairs, project, restart_mode=False, + vlr_handler=None): self._dts = dts self._log = log self._loop = loop @@ -1239,6 +1270,8 @@ class NetworkServiceRecord(object): self._nsr_cfg_msg = nsr_cfg_msg self._nsm_plugin = nsm_plugin self._sdn_account_name = sdn_account_name + self._vlr_handler = vlr_handler + self._project = project self._nsd = None self._nsr_msg = None @@ -1261,6 +1294,7 @@ class NetworkServiceRecord(object): self._is_active = False self._vl_phase_completed = False self._vnf_phase_completed = False + self.vlr_uptime_tasks = {} # Initalise the state to init @@ -1271,7 +1305,7 @@ class NetworkServiceRecord(object): self.set_state(NetworkServiceRecordState.INIT) - self.substitute_input_parameters = InputParameterSubstitution(self._log) + self.substitute_input_parameters = InputParameterSubstitution(self._log, self._project) @property def nsm_plugin(self): @@ -1290,6 +1324,7 @@ class NetworkServiceRecord(object): self._vnf_phase_completed = True self._op_status.set_state(state) + self._nsm_plugin.set_state(self.id, state) @property def id(self): @@ -1386,7 +1421,7 @@ class NetworkServiceRecord(object): for group_info in self._nsr_cfg_msg.nsd_placement_group_maps: if group_info.placement_group_ref == input_group.name: - group = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo() + group = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_PlacementGroupsInfo() group_dict = {k:v for k,v in group_info.as_dict().items() if k != 'placement_group_ref'} for param in copy_dict: @@ -1452,6 +1487,23 @@ class NetworkServiceRecord(object): for vlr in self._vlrs: yield from self.nsm_plugin.instantiate_vl(self, vlr) vlr.state = VlRecordState.ACTIVE + self.vlr_uptime_tasks[vlr.id] = self._loop.create_task(self.vlr_uptime_update(vlr)) + + + def vlr_uptime_update(self, vlr): + try: + + vlr_ = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.from_dict({'id': vlr.id}) + while True: + vlr_.uptime = int(time.time()) - vlr._create_time + xpath = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) + yield from self._vlr_handler.update(None, xpath, vlr_) + yield from asyncio.sleep(2, loop=self._loop) + except asyncio.CancelledError: + self._log.debug("Received cancellation request for vlr_uptime_update task") + xpath = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) + yield from self._vlr_handler.delete(None, xpath) + @asyncio.coroutine def create(self, config_xact): @@ -1807,19 +1859,20 @@ class NetworkServiceRecord(object): self._vnffgrs[vnffgr.id] = vnffgr def resolve_vld_ip_profile(self, nsd_msg, vld): + self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref) if not vld.has_field('ip_profile_ref'): return None - profile = [ profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref ] + profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref] return profile[0] if profile else None @asyncio.coroutine def _create_vls(self, vld, cloud_account,om_datacenter): """Create a VLR in the cloud account specified using the given VLD - + Args: vld : VLD yang obj cloud_account : Cloud account name - + Returns: VirtualLinkRecord """ @@ -1827,6 +1880,7 @@ class NetworkServiceRecord(object): self._dts, self._log, self._loop, + self._project, self.name, vld, cloud_account, @@ -1887,6 +1941,7 @@ class NetworkServiceRecord(object): """ This function creates VLs for every VLD in the NSD associated with this NSR""" for vld in self.nsd_msg.vld: + self._log.debug("Found vld %s in nsr id %s", vld, self.id) cloud_account_list = self._extract_cloud_accounts_for_vl(vld) for cloud_account,om_datacenter in cloud_account_list: @@ -1913,8 +1968,8 @@ class NetworkServiceRecord(object): if vlr is None: cloud_account_list = self._extract_cloud_accounts_for_vl(vld) - for account in cloud_account_list: - vlr = yield from self._create_vls(vld, account) + for account,om_datacenter in cloud_account_list: + vlr = yield from self._create_vls(vld, account,om_datacenter) self._vlrs.append(vlr) vlr.state = VlRecordState.INSTANTIATION_PENDING @@ -2009,6 +2064,7 @@ class NetworkServiceRecord(object): vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts, self._log, self._loop, + self._project, vnfd_msg, const_vnfd, self.nsd_id, @@ -2164,23 +2220,23 @@ class NetworkServiceRecord(object): @property def nsr_xpath(self): """ Returns the xpath associated with this NSR """ - return( + return self._project.add_project(( "D,/nsr:ns-instance-opdata" + "/nsr:nsr[nsr:ns-instance-config-ref = '{}']" - ).format(self.id) + ).format(self.id)) @staticmethod def xpath_from_nsr(nsr): """ Returns the xpath associated with this NSR op data""" - return (NetworkServiceRecord.XPATH + - "[nsr:ns-instance-config-ref = '{}']").format(nsr.id) + return self._project.add_project((NetworkServiceRecord.XPATH + + "[nsr:ns-instance-config-ref = '{}']").format(nsr.id)) @property def nsd_xpath(self): """ Return NSD config xpath.""" - return( + return self._project.add_project(( "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']" - ).format(self.nsd_id) + ).format(self.nsd_id)) @asyncio.coroutine def instantiate(self, config_xact): @@ -2398,6 +2454,8 @@ class NetworkServiceRecord(object): for vlr in self.vlrs: yield from self.nsm_plugin.terminate_vl(vlr) vlr.state = VlRecordState.TERMINATED + if vlr.id in self.vlr_uptime_tasks: + self.vlr_uptime_tasks[vlr.id].cancel() self._log.debug("Terminating network service id %s", self.id) @@ -2460,7 +2518,7 @@ class NetworkServiceRecord(object): def create_msg(self): """ The network serice record as a message """ nsr_dict = {"ns_instance_config_ref": self.id} - nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict(nsr_dict) + nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict(nsr_dict) #nsr.cloud_account = self.cloud_account_name nsr.sdn_account = self._sdn_account_name nsr.name_ref = self.name @@ -2471,9 +2529,10 @@ class NetworkServiceRecord(object): nsr.config_status = self.map_config_status() nsr.config_status_details = self._config_status_details nsr.create_time = self._create_time + nsr.uptime = int(time.time()) - self._create_time for cfg_prim in self.nsd_msg.service_primitive: - cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( + cfg_prim = NsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( cfg_prim.as_dict()) nsr.service_primitive.append(cfg_prim) @@ -2613,7 +2672,7 @@ class InputParameterSubstitution(object): This class is responsible for substituting input parameters into an NSD. """ - def __init__(self, log): + def __init__(self, log, project): """Create an instance of InputParameterSubstitution Arguments: @@ -2621,6 +2680,7 @@ class InputParameterSubstitution(object): """ self.log = log + self.project = project def __call__(self, nsd, nsr_config): """Substitutes input parameters from the NSR config into the NSD @@ -2640,7 +2700,7 @@ class InputParameterSubstitution(object): # to be modified optional_input_parameters = set() for input_parameter in nsd.input_parameter_xpath: - optional_input_parameters.add(input_parameter.xpath) + optional_input_parameters.add(self.project.add_project(input_parameter.xpath)) # Apply the input parameters to the descriptor if nsr_config.input_parameter: @@ -2719,7 +2779,9 @@ class NetworkServiceDescriptor(object): @staticmethod def path_for_id(nsd_id): """ Return path for the passed nsd_id""" - return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id) + return self._nsm._project.add_project( + "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'". + format(nsd_id)) def path(self): """ Return the message associated with this NetworkServiceDescriptor""" @@ -2741,6 +2803,7 @@ class NsdDtsHandler(object): self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2751,6 +2814,11 @@ class NsdDtsHandler(object): def register(self): """ Register for Nsd create/update/delete/read requests from dts """ + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL @@ -2778,7 +2846,7 @@ class NsdDtsHandler(object): except Exception as e: self._log.error("Exception in cleaning up NSD libs {}: {}". format(nsd_id, e)) - self._log.excpetion(e) + self._log.exception(e) @asyncio.coroutine def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch): @@ -2818,10 +2886,17 @@ class NsdDtsHandler(object): # Need a list in scratch to store NSDs to create/update later # acg._scratch['nsds'] = list() self._regh = acg.register( - xpath=NsdDtsHandler.XPATH, + xpath=self._project.add_project(NsdDtsHandler.XPATH), flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register NSD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class VnfdDtsHandler(object): """ DTS handler for VNFD config changes """ @@ -2833,6 +2908,7 @@ class VnfdDtsHandler(object): self._loop = loop self._nsm = nsm self._regh = None + self._project = nsm._project @property def regh(self): @@ -2843,6 +2919,11 @@ class VnfdDtsHandler(object): def register(self): """ Register for VNFD configuration""" + if self._regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" @@ -2883,20 +2964,28 @@ class VnfdDtsHandler(object): xact_info.respond_xpath(rwdts.XactRspCode.ACK) + xpath = self._project.add_project(VnfdDtsHandler.XPATH) self._log.debug( - "Registering for VNFD config using xpath: %s", - VnfdDtsHandler.XPATH, - ) + "Registering for VNFD config using xpath {} for project {}" + .format(xpath, self._project)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: # Need a list in scratch to store VNFDs to create/update later # acg._scratch['vnfds'] = list() # acg._scratch['deleted_vnfds'] = list() self._regh = acg.register( - xpath=VnfdDtsHandler.XPATH, + xpath=xpath, flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY, on_prepare=on_prepare) + def deregister(self): + self._log.debug("De-register VNFD handler for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + + class NsrRpcDtsHandler(object): """ The network service instantiation RPC DTS handler """ EXEC_NSR_CONF_XPATH = "I,/nsr:start-network-service" @@ -2918,7 +3007,9 @@ class NsrRpcDtsHandler(object): self._ns_regh = None self._manager = None - self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + 'config/ns-instance-config' + self._nsr_config_url = NsrRpcDtsHandler.REST_BASE_V2_URL + \ + 'config/project/{}/ns-instance-config'. \ + format(self._nsm._project.name) self._model = RwYang.Model.create_libncx() self._model.load_schema_ypbc(RwNsrYang.get_schema()) @@ -2966,47 +3057,62 @@ class NsrRpcDtsHandler(object): def _apply_ns_instance_config(self,payload_dict): #self._log.debug("At apply NS instance config with payload %s",payload_dict) - req_hdr= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'} - response=requests.post(self._nsr_config_url, headers=req_hdr, auth=('admin', 'admin'),data=payload_dict,verify=False) + req_hdr= {'accept':'application/vnd.yang.data+json', + 'content-type':'application/vnd.yang.data+json'} + response=requests.post(self._nsr_config_url, headers=req_hdr, + auth=('admin', 'admin'),data=payload_dict,verify=False) return response @asyncio.coroutine def register(self): """ Register for NS monitoring read from dts """ + if self._ns_regh: + self._log.warning("RPC already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_ns_config_prepare(xact_info, action, ks_path, msg): """ prepare callback from dts start-network-service""" assert action == rwdts.QueryAction.RPC rpc_ip = msg + + if not self._nsm._project.rpc_check(msg, xact_info=xact_info): + return + rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ - "nsr_id":str(uuid.uuid4()) - }) - - if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): - self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) - + "nsr_id":str(uuid.uuid4()), + "project_name": msg.prject_name, + }) + + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and + ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): + self._log.error("Mandatory parameters name or nsd_ref or " + + "cloud account not found in start-network-service {}". + format(rpc_ip)) + self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) try: # Add used value to the pool self._log.debug("RPC output: {}".format(rpc_op)) - + nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref) #if not self._manager: # self._manager = yield from self._connect() - + self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", rpc_ip.name, rpc_ip.nsd_ref) ns_instance_config_dict = {"id":rpc_op.nsr_id, "admin_status":"ENABLED"} ns_instance_config_copy_dict = {k:v for k, v in rpc_ip.as_dict().items() - if k in RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr().fields} + if k in RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr().fields} ns_instance_config_dict.update(ns_instance_config_copy_dict) - ns_instance_config = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) - ns_instance_config.nsd = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_Nsd() + ns_instance_config = RwNsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.from_dict(ns_instance_config_dict) + ns_instance_config.nsd = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_Nsd() ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict()) payload_dict = ns_instance_config.to_json(self._model) @@ -3049,6 +3155,13 @@ class NsrRpcDtsHandler(object): flags=rwdts.Flag.PUBLISHER, ) + def deregister(self): + self._log.debug("De-register NSR RPC for project {}". + format(self._nsm._project.name)) + if self._ns_regh: + self._ns_regh.deregister() + self._ns_regh = None + class NsrDtsHandler(object): """ The network service DTS handler """ @@ -3061,6 +3174,7 @@ class NsrDtsHandler(object): self._log = log self._loop = loop self._nsm = nsm + self._project = self._nsm._project self._nsr_regh = None self._scale_regh = None @@ -3075,13 +3189,18 @@ class NsrDtsHandler(object): def register(self): """ Register for Nsr create/update/delete/read requests from dts """ + if self._nsr_regh: + self._log.warning("DTS handler already registered for project {}". + format(self._project.name)) + return + def nsr_id_from_keyspec(ks): - nsr_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) + nsr_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr.schema().keyspec_to_entry(ks) nsr_id = nsr_path_entry.key00.id return nsr_id def group_name_from_keyspec(ks): - group_path_entry = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) + group_path_entry = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup.schema().keyspec_to_entry(ks) group_name = group_path_entry.key00.scaling_group_name_ref return group_name @@ -3172,39 +3291,13 @@ class NsrDtsHandler(object): for vld in vl_delta["deleted"]: yield from self._nsm.nsr_terminate_vl(nsr_id, vld) - def get_add_delete_update_cfgs(dts_member_reg, xact, key_name, scratch): - # Unfortunately, it is currently difficult to figure out what has exactly - # changed in this xact without Pbdelta support (RIFT-4916) - # As a workaround, we can fetch the pre and post xact elements and - # perform a comparison to figure out adds/deletes/updates - xact_cfgs = list(dts_member_reg.get_xact_elements(xact)) - curr_cfgs = list(dts_member_reg.elements) - - xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs} - curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs} - - # Find Adds - added_keys = set(xact_key_map) - set(curr_key_map) - added_cfgs = [xact_key_map[key] for key in added_keys] - - # Find Deletes - deleted_keys = set(curr_key_map) - set(xact_key_map) - deleted_cfgs = [curr_key_map[key] for key in deleted_keys] - - # Find Updates - updated_keys = set(curr_key_map) & set(xact_key_map) - updated_cfgs = [xact_key_map[key] for key in updated_keys - if xact_key_map[key] != curr_key_map[key]] - - return added_cfgs, deleted_cfgs, updated_cfgs - def get_nsr_key_pairs(dts_member_reg, xact): key_pairs = {} for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True): - self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) + self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) xpath = keyspec.to_xpath(RwNsrYang.get_schema()) key_pairs[instance_cfg.name] = instance_cfg - return key_pairs + return key_pairs def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" @@ -3246,7 +3339,11 @@ class NsrDtsHandler(object): def begin_instantiation(nsr): # Begin instantiation self._log.info("Beginning NS instantiation: %s", nsr.id) - yield from self._nsm.instantiate_ns(nsr.id, xact) + try: + yield from self._nsm.instantiate_ns(nsr.id, xact) + except Exception as e: + self._log.exception("NS instantiation: {}".format(e)) + raise e self._log.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)", xact, action, scratch) @@ -3262,8 +3359,7 @@ class NsrDtsHandler(object): (added_msgs, deleted_msgs, updated_msgs) = get_add_delete_update_cfgs(self._nsr_regh, xact, - "id", - scratch) + "id") self._log.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs, deleted_msgs, updated_msgs) @@ -3374,24 +3470,40 @@ class NsrDtsHandler(object): acg.handle.prepare_complete_ok(xact_info.handle) - self._log.debug("Registering for NSR config using xpath: %s", - NsrDtsHandler.NSR_XPATH) + xpath = self._project.add_project(NsrDtsHandler.NSR_XPATH) + self._log.debug("Registering for NSR config using xpath: {}". + format(xpath)) acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply) with self._dts.appconf_group_create(handler=acg_hdl) as acg: - self._nsr_regh = acg.register(xpath=NsrDtsHandler.NSR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - on_prepare=on_prepare) + self._nsr_regh = acg.register( + xpath=xpath, + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + on_prepare=on_prepare + ) self._scale_regh = acg.register( - xpath=NsrDtsHandler.SCALE_INSTANCE_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, - ) + xpath=self._project.add_project(NsrDtsHandler.SCALE_INSTANCE_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY| rwdts.Flag.CACHE, + ) self._key_pair_regh = acg.register( - xpath=NsrDtsHandler.KEY_PAIR_XPATH, - flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, - ) + xpath=self._project.add_project(NsrDtsHandler.KEY_PAIR_XPATH), + flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY | rwdts.Flag.CACHE, + ) + + def deregister(self): + self._log.debug("De-register NSR config for project {}". + format(self._project.name)) + if self._nsr_regh: + self._nsr_regh.deregister() + self._nsr_regh = None + if self._scale_regh: + self._scale_regh.deregister() + self._scale_regh = None + if self._key_pair_regh: + self._key_pair_regh.deregister() + self._key_pair_regh = None class NsrOpDataDtsHandler(object): @@ -3403,6 +3515,8 @@ class NsrOpDataDtsHandler(object): self._log = log self._loop = loop self._nsm = nsm + + self._project = nsm._project self._regh = None @property @@ -3418,39 +3532,55 @@ class NsrOpDataDtsHandler(object): @asyncio.coroutine def register(self): """ Register for Nsr op data publisher registration""" - self._log.debug("Registering Nsr op data path %s as publisher", - NsrOpDataDtsHandler.XPATH) + if self._regh: + self._log.warning("NSR op data handler already registered for project {}". + format(self._project.name)) + return + + xpath = self._project.add_project(NsrOpDataDtsHandler.XPATH) + self._log.debug("Registering Nsr op data path {} as publisher". + format(xpath)) hdl = rift.tasklets.DTS.RegistrationHandler() handlers = rift.tasklets.Group.Handler() with self._dts.group_create(handler=handlers) as group: - self._regh = group.register(xpath=NsrOpDataDtsHandler.XPATH, + self._regh = group.register(xpath=xpath, handler=hdl, flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ | rwdts.Flag.DATASTORE) + def deregister(self): + self._log.debug("De-register NSR opdata for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + @asyncio.coroutine - def create(self, path, msg): + def create(self, xpath, msg): """ Create an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Creating NSR %s:%s", path, msg) self.regh.create_element(path, msg) self._log.debug("Created NSR, %s:%s", path, msg) @asyncio.coroutine - def update(self, path, msg, flags=rwdts.XactFlag.REPLACE): + def update(self, xpath, msg, flags=rwdts.XactFlag.REPLACE): """ Update an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Updating NSR, %s:%s regh = %s", path, msg, self.regh) self.regh.update_element(path, msg, flags) self._log.debug("Updated NSR, %s:%s", path, msg) @asyncio.coroutine - def delete(self, path): + def delete(self, xpath): """ Update an NS record in DTS with the path and message """ + path = self._project.add_project(xpath) self._log.debug("Deleting NSR path:%s", path) self.regh.delete_element(path) self._log.debug("Deleted NSR path:%s", path) @@ -3481,6 +3611,11 @@ class VnfrDtsHandler(object): @asyncio.coroutine def register(self): """ Register for vnfr create/update/delete/ advises from dts """ + if self._regh: + self._log.warning("VNFR DTS handler already registered for project {}". + format(self._project.name)) + return + def on_commit(xact_info): """ The transaction has been committed """ @@ -3496,16 +3631,17 @@ class VnfrDtsHandler(object): xact_info, action, ks_path, msg ) - schema = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.schema() + schema = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.schema() path_entry = schema.keyspec_to_entry(ks_path) if path_entry.key00.id not in self._nsm._vnfrs: - self._log.error("%s request for non existent record path %s", - action, xpath) + # Check if this is a monitoring param xpath + if 'vnfr:monitoring-param' not in xpath: + self._log.error("%s request for non existent record path %s", + action, xpath) xact_info.respond_xpath(rwdts.XactRspCode.NA, xpath) return - self._log.debug("Deleting VNFR with id %s", path_entry.key00.id) if action == rwdts.QueryAction.CREATE or action == rwdts.QueryAction.UPDATE: yield from self._nsm.update_vnfr(msg) elif action == rwdts.QueryAction.DELETE: @@ -3520,10 +3656,17 @@ class VnfrDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_commit=on_commit, on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=VnfrDtsHandler.XPATH, + self._regh = group.register(xpath=self._nsm._project.add_project( + VnfrDtsHandler.XPATH), handler=hdl, flags=(rwdts.Flag.SUBSCRIBER),) + def deregister(self): + self._log.debug("De-register VNFR for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None class NsdRefCountDtsHandler(object): """ The NSD Ref Count DTS handler """ @@ -3550,6 +3693,11 @@ class NsdRefCountDtsHandler(object): @asyncio.coroutine def register(self): """ Register for NSD ref count read from dts """ + if self._regh: + self._log.warning("NSD ref DTS handler already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_prepare(xact_info, action, ks_path, msg): @@ -3557,7 +3705,7 @@ class NsdRefCountDtsHandler(object): xpath = ks_path.to_xpath(RwNsrYang.get_schema()) if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema() + schema = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount.schema() path_entry = schema.keyspec_to_entry(ks_path) nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref) for xpath, msg in nsd_list: @@ -3570,19 +3718,28 @@ class NsdRefCountDtsHandler(object): hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) with self._dts.group_create() as group: - self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH, + self._regh = group.register(xpath=self._nsm._project.add_project( + NsdRefCountDtsHandler.XPATH), handler=hdl, flags=rwdts.Flag.PUBLISHER,) + def deregister(self): + self._log.debug("De-register NSD Ref count for project {}". + format(self._project.name)) + if self._regh: + self._regh.deregister() + self._regh = None + class NsManager(object): """ The Network Service Manager class""" - def __init__(self, dts, log, loop, + def __init__(self, dts, log, loop, project, nsr_handler, vnfr_handler, vlr_handler, ro_plugin_selector, vnffgmgr, vnfd_pub_handler, cloud_account_handler): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_handler = nsr_handler self._vnfr_pub_handler = vnfr_handler self._vlr_pub_handler = vlr_handler @@ -3613,8 +3770,9 @@ class NsManager(object): VnfrDtsHandler(dts, log, loop, self), NsdRefCountDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), - ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), - NsrRpcDtsHandler(dts,log,loop,self), + ScalingRpcHandler(log, dts, loop, self._project, + self.scale_rpc_callback), + NsrRpcDtsHandler(dts, log, loop, self), self._vnfd_dts_handler, self.cfgmgr_obj, ] @@ -3690,6 +3848,11 @@ class NsManager(object): for dts_handle in self._dts_handlers: yield from dts_handle.register() + def deregister(self): + """ Register all static DTS handlers """ + for dts_handle in self._dts_handlers: + yield from dts_handle.deregister() + def get_ns_by_nsr_id(self, nsr_id): """ get NSR by nsr id """ @@ -3729,12 +3892,16 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup + ScalingGroupInstance = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup_Instance + ScalingGroup = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_ScalingGroup - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) + xpath = self._project.add_project( + ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]'). + format(msg.nsr_id_ref)) + + instance = ScalingGroupInstance.from_dict({ + "id": msg.instance_id, + "project_name": self._project.name,}) @asyncio.coroutine def get_nsr_scaling_group(): @@ -3789,7 +3956,7 @@ class NsManager(object): # msg.nsr_id_ref, # msg.scaling_group_name_ref, # msg.instance_id) - + def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] nsr.nsr_cfg_msg= msg @@ -3834,7 +4001,9 @@ class NsManager(object): nsr_msg, sdn_account_name, key_pairs, - restart_mode=restart_mode + self._project, + restart_mode=restart_mode, + vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr ) self._nsrs[nsr_msg.id] = nsr nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs) @@ -3891,7 +4060,7 @@ class NsManager(object): @asyncio.coroutine def get_nsr_config(self, nsd_id): - xpath = "C,/nsr:ns-instance-config" + xpath = self._project.add_project("C,/nsr:ns-instance-config") results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) for result in results: @@ -3965,7 +4134,7 @@ class NsManager(object): self.create_nsd(nsd) else: self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd) - self._nsds[nsd.id].update(nsd) + self._nsds[nsd.id].update(nsd) def delete_nsd(self, nsd_id): """ Delete the Network service descriptor with the passed id """ @@ -4017,9 +4186,6 @@ class NsManager(object): """ Update the virtual network function descriptor """ self._log.debug("Update virtual network function descriptor- %s", vnfd) - # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511 - for ivld in vnfd.internal_vld: - ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref)) if vnfd.id not in self._vnfds: self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id) @@ -4073,18 +4239,18 @@ class NsManager(object): def nsd_refcount_xpath(nsd_id): """ xpath for ref count entry """ - return (NsdRefCountDtsHandler.XPATH + + return (self._project.add_project(NsdRefCountDtsHandler.XPATH) + "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id) nsd_list = [] if nsd_id is None or nsd_id == "": for nsd in self._nsds.values(): - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() + nsd_msg = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount() nsd_msg.nsd_id_ref = nsd.id nsd_msg.instance_ref_count = nsd.ref_count nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg)) elif nsd_id in self._nsds: - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() + nsd_msg = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_NsdRefCount() nsd_msg.nsd_id_ref = self._nsds[nsd_id].id nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg)) @@ -4099,7 +4265,10 @@ class NsManager(object): # Terminate the instances/networks assocaited with this nw service self._log.debug("Terminating the network service %s", nsr_id) - yield from self._nsrs[nsr_id].terminate() + try : + yield from self._nsrs[nsr_id].terminate() + except Exception as e: + self.log.exception("Failed to terminate NSR[id=%s]", nsr_id) # Unref the NSD yield from self.nsd_unref_by_nsr_id(nsr_id) @@ -4117,10 +4286,12 @@ class NsmRecordsPublisherProxy(object): """ This class provides a publisher interface that allows plugin objects to publish NSR/VNFR/VLR""" - def __init__(self, dts, log, loop, nsr_pub_hdlr, vnfr_pub_hdlr, vlr_pub_hdlr): + def __init__(self, dts, log, loop, project, nsr_pub_hdlr, + vnfr_pub_hdlr, vlr_pub_hdlr,): self._dts = dts self._log = log self._loop = loop + self._project = project self._nsr_pub_hdlr = nsr_pub_hdlr self._vlr_pub_hdlr = vlr_pub_hdlr self._vnfr_pub_hdlr = vnfr_pub_hdlr @@ -4152,13 +4323,13 @@ class NsmRecordsPublisherProxy(object): @asyncio.coroutine def publish_vlr(self, xact, vlr): """ Publish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.update(xact, path, vlr)) @asyncio.coroutine def unpublish_vlr(self, xact, vlr): """ Unpublish a VLR """ - path = VirtualLinkRecord.vlr_xpath(vlr) + path = self._project.add_project(VirtualLinkRecord.vlr_xpath(vlr)) return (yield from self._vlr_pub_hdlr.delete(xact, path)) @@ -4172,24 +4343,35 @@ class ScalingRpcHandler(mano_dts.DtsHandler): ACTION = Enum('ACTION', 'SCALE_IN SCALE_OUT') - def __init__(self, log, dts, loop, callback=None): - super().__init__(log, dts, loop) + def __init__(self, log, dts, loop, project, callback=None): + super().__init__(log, dts, loop, project) self.callback = callback self.last_instance_id = defaultdict(int) + self._regh_in = None + self._regh_out = None @asyncio.coroutine def register(self): + if self._regh_in: + self._log.warning("RPC already registered for project {}". + format(self._project.name)) + return + @asyncio.coroutine def on_scale_in_prepare(xact_info, action, ks_path, msg): assert action == rwdts.QueryAction.RPC try: + if not self._project.rpc_check(msg, xact_info=xact_info): + return + if self.callback: self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ - "instance_id": msg.instance_id}) + "instance_id": msg.instance_id, + "project_name": self._project.name,}) xact_info.respond_xpath( rwdts.XactRspCode.ACK, @@ -4207,6 +4389,9 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: + if not self._project.rpc_check(msg, xact_info=xact_info): + return + scaling_group = msg.scaling_group_name_ref if not msg.instance_id: last_instance_id = self.last_instance_id[scale_group] @@ -4217,7 +4402,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ - "instance_id": msg.instance_id}) + "instance_id": msg.instance_id, + "project_name": self._project.name,}) xact_info.respond_xpath( rwdts.XactRspCode.ACK, @@ -4236,32 +4422,38 @@ class ScalingRpcHandler(mano_dts.DtsHandler): on_prepare=on_scale_out_prepare) with self.dts.group_create() as group: - group.register( - xpath=self.__class__.SCALE_IN_INPUT_XPATH, - handler=scale_in_hdl, - flags=rwdts.Flag.PUBLISHER) - group.register( - xpath=self.__class__.SCALE_OUT_INPUT_XPATH, - handler=scale_out_hdl, - flags=rwdts.Flag.PUBLISHER) - - -class NsmTasklet(rift.tasklets.Tasklet): - """ - The network service manager tasklet - """ - def __init__(self, *args, **kwargs): - super(NsmTasklet, self).__init__(*args, **kwargs) - self.rwlog.set_category("rw-mano-log") - self.rwlog.set_subcategory("nsm") + self._regh_in = group.register( + xpath=self.__class__.SCALE_IN_INPUT_XPATH, + handler=scale_in_hdl, + flags=rwdts.Flag.PUBLISHER) + self._regh_out = group.register( + xpath=self.__class__.SCALE_OUT_INPUT_XPATH, + handler=scale_out_hdl, + flags=rwdts.Flag.PUBLISHER) + + def deregister(self): + self._log.debug("De-register scale RPCs for project {}". + format(self._project.name)) + if self._regh_in: + self._regh_in.deregister() + self._regh_in = None + if self._regh_out: + self._regh_out.deregister() + self._regh_out = None + + +class NsmProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(NsmProject, self).__init__(tasklet.log, name) + self.update(tasklet) - self._dts = None self._nsm = None self._ro_plugin_selector = None self._vnffgmgr = None - self._nsr_handler = None + self._nsr_pub_handler = None self._vnfr_pub_handler = None self._vlr_pub_handler = None self._vnfd_pub_handler = None @@ -4269,57 +4461,33 @@ class NsmTasklet(rift.tasklets.Tasklet): self._records_publisher_proxy = None - def start(self): - """ The task start callback """ - super(NsmTasklet, self).start() - self.log.info("Starting NsmTasklet") - - self.log.debug("Registering with dts") - self._dts = rift.tasklets.DTS(self.tasklet_info, - RwNsmYang.get_schema(), - self.loop, - self.on_dts_state_change) - - self.log.debug("Created DTS Api GI Object: %s", self._dts) - - def stop(self): - try: - self._dts.deinit() - except Exception: - print("Caught Exception in NSM stop:", sys.exc_info()[0]) - raise - - def on_instance_started(self): - """ Task instance started callback """ - self.log.debug("Got instance started callback") - @asyncio.coroutine - def init(self): - """ Task init callback """ - self.log.debug("Got instance started callback") - - self.log.debug("creating config account handler") - - self._nsr_pub_handler = publisher.NsrOpDataDtsHandler(self._dts, self.log, self.loop) + def register(self): + self._nsr_pub_handler = publisher.NsrOpDataDtsHandler( + self._dts, self.log, self.loop, self) yield from self._nsr_pub_handler.register() - self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vnfr_pub_handler = publisher.VnfrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vnfr_pub_handler.register() - self._vlr_pub_handler = publisher.VlrPublisherDtsHandler(self._dts, self.log, self.loop) + self._vlr_pub_handler = publisher.VlrPublisherDtsHandler( + self._dts, self.log, self.loop, self) yield from self._vlr_pub_handler.register() - manifest = self.tasklet_info.get_pb_manifest() + manifest = self._tasklet.tasklet_info.get_pb_manifest() use_ssl = manifest.bootstrap_phase.rwsecurity.use_ssl ssl_cert = manifest.bootstrap_phase.rwsecurity.cert ssl_key = manifest.bootstrap_phase.rwsecurity.key - self._vnfd_pub_handler = publisher.VnfdPublisher(use_ssl, ssl_cert, ssl_key, self.loop) + self._vnfd_pub_handler = publisher.VnfdPublisher( + use_ssl, ssl_cert, ssl_key, self.loop, self) self._records_publisher_proxy = NsmRecordsPublisherProxy( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, @@ -4331,6 +4499,7 @@ class NsmTasklet(rift.tasklets.Tasklet): self._dts, self.log, self.loop, + self, self._records_publisher_proxy, ) yield from self._ro_plugin_selector.register() @@ -4338,28 +4507,91 @@ class NsmTasklet(rift.tasklets.Tasklet): self._cloud_account_handler = cloud.CloudAccountConfigSubscriber( self._log, self._dts, - self.log_hdl) + self.log_hdl, + self, + ) yield from self._cloud_account_handler.register() - self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop) + self._vnffgmgr = rwvnffgmgr.VnffgMgr(self._dts, self.log, self.log_hdl, self.loop, self) yield from self._vnffgmgr.register() self._nsm = NsManager( self._dts, self.log, self.loop, + self, self._nsr_pub_handler, self._vnfr_pub_handler, self._vlr_pub_handler, self._ro_plugin_selector, self._vnffgmgr, self._vnfd_pub_handler, - self._cloud_account_handler + self._cloud_account_handler, ) yield from self._nsm.register() + def deregister(self): + self._log.debug("Project {} de-register".format(self.name)) + self._nsm.deregister() + self._vnffgmgr.deregister() + self._cloud_account_handler.deregister() + self._ro_plugin_selector.deregister() + + +class NsmTasklet(rift.tasklets.Tasklet): + """ + The network service manager tasklet + """ + def __init__(self, *args, **kwargs): + super(NsmTasklet, self).__init__(*args, **kwargs) + self.rwlog.set_category("rw-mano-log") + self.rwlog.set_subcategory("nsm") + + self._dts = None + self.project_handler = None + self.projects = {} + + @property + def dts(self): + return self._dts + + def start(self): + """ The task start callback """ + super(NsmTasklet, self).start() + self.log.info("Starting NsmTasklet") + + self.log.debug("Registering with dts") + self._dts = rift.tasklets.DTS(self.tasklet_info, + RwNsmYang.get_schema(), + self.loop, + self.on_dts_state_change) + + self.log.debug("Created DTS Api GI Object: %s", self._dts) + + def stop(self): + try: + self._dts.deinit() + except Exception: + print("Caught Exception in NSM stop:", sys.exc_info()[0]) + raise + + def on_instance_started(self): + """ Task instance started callback """ + self.log.debug("Got instance started callback") + + @asyncio.coroutine + def init(self): + """ Task init callback """ + self.log.debug("Got instance started callback") + + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, NsmProject) + self.project_handler.register() + + + @asyncio.coroutine def run(self): """ Task run callback """