X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=7cbc194b99dd568c7c62dd46bc080473b7756556;hb=375ebf79a245a2140c3eeaf82b1843d255c149fd;hp=9877ae9ca11725588ce4d10c1a5e13a4b783ae73;hpb=d0d7b8a40207d7c67e72c65879226c441e443d05;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 9877ae9c..7cbc194b 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -1,4 +1,4 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -51,7 +51,7 @@ from gi.repository import ( VnfrYang, RwVnfrYang, RwNsmYang, - RwsdnYang, + RwsdnalYang, RwDts as rwdts, RwTypes, ProtobufC, @@ -111,16 +111,10 @@ class NetworkServiceDescriptorNotFound(Exception): pass -class NetworkServiceDescriptorRefCountExists(Exception): +class NetworkServiceDescriptorNotFound(Exception): """ Network Service Descriptor reference count exists """ pass - -class NetworkServiceDescriptorUnrefError(Exception): - """ Failed to unref a network service descriptor """ - pass - - class NsrInstantiationFailed(Exception): """ Failed to instantiate network service """ pass @@ -220,7 +214,6 @@ class VnffgRecord(object): if self._vnffgr_state == VnffgRecordState.INIT: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -229,7 +222,6 @@ class VnffgRecord(object): vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) elif self._vnffgr_state == VnffgRecordState.TERMINATED: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -243,7 +235,6 @@ class VnffgRecord(object): self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id) self._vnffgr_state = VnffgRecordState.FAILED vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -257,7 +248,6 @@ class VnffgRecord(object): def vnffgr_create_msg(self): """ Virtual Link Record message for Creating VLR in VNS """ vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -387,7 +377,7 @@ class VnffgRecord(object): vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) - sff = RwsdnYang.VNFFGSff() + sff = RwsdnalYang.VNFFGSff() sff_list[nsr_vnfr.vnfd.id] = sff sff.name = nsr_vnfr.name sff.function_type = nsr_vnfr.vnfd.service_function_chain @@ -460,6 +450,7 @@ class VnffgRecord(object): class VirtualLinkRecord(object): """ Virtual Link Records class""" + XPATH = "D,/vlr:vlr-catalog/vlr:vlr" @staticmethod @asyncio.coroutine def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False): @@ -515,7 +506,8 @@ class VirtualLinkRecord(object): self._vlr_id = str(uuid.uuid4()) self._state = VlRecordState.INIT self._prev_state = None - + self._create_time = int(time.time()) + @property def xpath(self): """ path for this object """ @@ -608,6 +600,7 @@ class VirtualLinkRecord(object): "nsr_id_ref": self._nsr_id, "vld_ref": self.vld_msg.id, "name": self.name, + "create_time": self._create_time, "cloud_account": self.cloud_account_name, "om_datacenter": self.om_datacenter_name, } @@ -645,7 +638,6 @@ class VirtualLinkRecord(object): @asyncio.coroutine def instantiate(self): """ Instantiate this VL """ - self._log.debug("Instaniating VLR key %s, vld %s", self.xpath, self._vld_msg) vlr = None @@ -794,6 +786,7 @@ class VirtualNetworkFunctionRecord(object): self._group_instance_id = group_instance_id self._placement_groups = placement_groups self._config_status = NsrYang.ConfigStates.INIT + self._create_time = int(time.time()) self._prev_state = VnfRecordState.INIT self._state = VnfRecordState.INIT @@ -950,7 +943,6 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict = { "id": self.id, "nsr_id_ref": self._nsr_id, - "vnfd_ref": self.vnfd.id, "name": self.name, "cloud_account": self._cloud_account_name, "om_datacenter": self._om_datacenter_name, @@ -959,6 +951,8 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict.update(vnfd_copy_dict) vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), + ignore_missing_keys=True) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -1072,6 +1066,9 @@ class VirtualNetworkFunctionRecord(object): cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang + if conn_p.has_field('port_security_enabled'): + cpr.port_security_enabled = conn_p.port_security_enabled + vlr_ref = find_vlr_for_cp(conn_p) if vlr_ref is None: msg = "Failed to find VLR for cp = %s" % conn_p.name @@ -1082,7 +1079,7 @@ class VirtualNetworkFunctionRecord(object): cpr.vlr_ref = vlr_ref.id self.vnfr_msg.connection_point.append(cpr) self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s", - cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref) + cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id) if not self.restart_mode: yield from self._dts.query_create(self.xpath, @@ -1231,7 +1228,8 @@ class NetworkServiceRecord(object): """ Network service record """ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False): + def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False, + vlr_handler=None): self._dts = dts self._log = log self._loop = loop @@ -1239,6 +1237,7 @@ class NetworkServiceRecord(object): self._nsr_cfg_msg = nsr_cfg_msg self._nsm_plugin = nsm_plugin self._sdn_account_name = sdn_account_name + self._vlr_handler = vlr_handler self._nsd = None self._nsr_msg = None @@ -1261,6 +1260,7 @@ class NetworkServiceRecord(object): self._is_active = False self._vl_phase_completed = False self._vnf_phase_completed = False + self.vlr_uptime_tasks = {} # Initalise the state to init @@ -1290,6 +1290,7 @@ class NetworkServiceRecord(object): self._vnf_phase_completed = True self._op_status.set_state(state) + self._nsm_plugin.set_state(self.id, state) @property def id(self): @@ -1453,6 +1454,20 @@ class NetworkServiceRecord(object): yield from self.nsm_plugin.instantiate_vl(self, vlr) vlr.state = VlRecordState.ACTIVE + + def vlr_uptime_update(self, vlr): + try: + + vlr_ = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict({'id': vlr.id}) + while True: + vlr_.uptime = int(time.time()) - vlr._create_time + yield from self._vlr_handler.update(None, VirtualLinkRecord.vlr_xpath(vlr), vlr_) + yield from asyncio.sleep(2, loop=self._loop) + except asyncio.CancelledError: + self._log.debug("Received cancellation request for vlr_uptime_update task") + yield from self._vlr_handler.delete(None, VirtualLinkRecord.vlr_xpath(vlr)) + + @asyncio.coroutine def create(self, config_xact): """ Create this network service""" @@ -1679,7 +1694,6 @@ class NetworkServiceRecord(object): vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index) scale_instance.add_vnfr(vnfr) vnfrs.append(vnfr) - return vnfrs @asyncio.coroutine @@ -1700,7 +1714,8 @@ class NetworkServiceRecord(object): format(group.name, index)) scale_instance.operational_status = "failed" else: - yield from self.instantiate_vnfs(vnfrs) + yield from self.instantiate_vnfs(vnfrs, scaleout=True) + except Exception as e: self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}". @@ -1807,19 +1822,20 @@ class NetworkServiceRecord(object): self._vnffgrs[vnffgr.id] = vnffgr def resolve_vld_ip_profile(self, nsd_msg, vld): + self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref) if not vld.has_field('ip_profile_ref'): return None - profile = [ profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref ] + profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref] return profile[0] if profile else None @asyncio.coroutine def _create_vls(self, vld, cloud_account,om_datacenter): """Create a VLR in the cloud account specified using the given VLD - + Args: vld : VLD yang obj cloud_account : Cloud account name - + Returns: VirtualLinkRecord """ @@ -1887,6 +1903,7 @@ class NetworkServiceRecord(object): """ This function creates VLs for every VLD in the NSD associated with this NSR""" for vld in self.nsd_msg.vld: + self._log.debug("Found vld %s in nsr id %s", vld, self.id) cloud_account_list = self._extract_cloud_accounts_for_vl(vld) for cloud_account,om_datacenter in cloud_account_list: @@ -1913,8 +1930,8 @@ class NetworkServiceRecord(object): if vlr is None: cloud_account_list = self._extract_cloud_accounts_for_vl(vld) - for account in cloud_account_list: - vlr = yield from self._create_vls(vld, account) + for account,om_datacenter in cloud_account_list: + vlr = yield from self._create_vls(vld, account,om_datacenter) self._vlrs.append(vlr) vlr.state = VlRecordState.INSTANTIATION_PENDING @@ -1987,7 +2004,7 @@ class NetworkServiceRecord(object): (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: - self._log.error("Could not resolve cloud-construct for placement group: %s", group.name) + self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name)) else: self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)", @@ -2070,14 +2087,14 @@ class NetworkServiceRecord(object): return vnfr @asyncio.coroutine - def instantiate_vnfs(self, vnfrs): + def instantiate_vnfs(self, vnfrs, scaleout=False): """ This function instantiates VNFs for every VNF in this Network Service """ self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id) for vnf in vnfrs: self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id) - yield from self.nsm_plugin.instantiate_vnf(self, vnf) + yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout) @asyncio.coroutine def instantiate_vnffgs(self): @@ -2215,14 +2232,6 @@ class NetworkServiceRecord(object): # Find the NSD self._nsd = self._nsr_cfg_msg.nsd - try: - # Update ref count if nsd present in catalog - self._nsm.get_nsd_ref(self.nsd_id) - - except NetworkServiceDescriptorError: - # This could be an NSD not in the nsd-catalog - pass - # Merge any config and initial config primitive values self.config_store.merge_nsd_config(self.nsd_msg) self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict())) @@ -2326,9 +2335,12 @@ class NetworkServiceRecord(object): def on_instantiate_done(fut): # If the do_instantiate fails, then publish NSR with failed result - if fut.exception() is not None: - self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception())) - self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception()))) + e = fut.exception() + if e is not None: + import traceback, sys + print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True) + self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e)) + self._loop.create_task(self.instantiation_failed(failed_reason=str(e))) instantiate_task = self._loop.create_task(do_instantiate()) instantiate_task.add_done_callback(on_instantiate_done) @@ -2398,6 +2410,8 @@ class NetworkServiceRecord(object): for vlr in self.vlrs: yield from self.nsm_plugin.terminate_vl(vlr) vlr.state = VlRecordState.TERMINATED + if vlr.id in self.vlr_uptime_tasks: + self.vlr_uptime_tasks[vlr.id].cancel() self._log.debug("Terminating network service id %s", self.id) @@ -2471,6 +2485,7 @@ class NetworkServiceRecord(object): nsr.config_status = self.map_config_status() nsr.config_status_details = self._config_status_details nsr.create_time = self._create_time + nsr.uptime = int(time.time()) - self._create_time for cfg_prim in self.nsd_msg.service_primitive: cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( @@ -2675,8 +2690,6 @@ class NetworkServiceDescriptor(object): self._loop = loop self._nsd = nsd - self._ref_count = 0 - self._nsm = nsm @property @@ -2689,28 +2702,6 @@ class NetworkServiceDescriptor(object): """ Returns name of nsd """ return self._nsd.name - @property - def ref_count(self): - """ Returns reference count""" - return self._ref_count - - def in_use(self): - """ Returns whether nsd is in use or not """ - return True if self.ref_count > 0 else False - - def ref(self): - """ Take a reference on this object """ - self._ref_count += 1 - - def unref(self): - """ Release reference on this object """ - if self.ref_count < 1: - msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" % - (self.id, self.ref_count)) - self._log.critical(msg) - raise NetworkServiceDescriptorError(msg) - self._ref_count -= 1 - @property def msg(self): """ Return the message associated with this NetworkServiceDescriptor""" @@ -2793,11 +2784,6 @@ class NsdDtsHandler(object): if fref.is_field_deleted(): # Delete an NSD record self._log.debug("Deleting NSD with id %s", msg.id) - if self._nsm.nsd_in_use(msg.id): - self._log.debug("Cannot delete NSD in use - %s", msg.id) - err = "Cannot delete an NSD in use - %s" % msg.id - raise NetworkServiceDescriptorRefCountExists(err) - yield from delete_nsd_libs(msg.id) self._nsm.delete_nsd(msg.id) else: @@ -2981,22 +2967,22 @@ class NsrRpcDtsHandler(object): rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ "nsr_id":str(uuid.uuid4()) }) - + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) - + self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) try: # Add used value to the pool self._log.debug("RPC output: {}".format(rpc_op)) - + nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref) #if not self._manager: # self._manager = yield from self._connect() - + self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", rpc_ip.name, rpc_ip.nsd_ref) @@ -3201,10 +3187,10 @@ class NsrDtsHandler(object): def get_nsr_key_pairs(dts_member_reg, xact): key_pairs = {} for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True): - self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) + self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) xpath = keyspec.to_xpath(RwNsrYang.get_schema()) key_pairs[instance_cfg.name] = instance_cfg - return key_pairs + return key_pairs def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" @@ -3354,6 +3340,8 @@ class NsrDtsHandler(object): raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined") if msg.has_field("scaling_group"): + self._log.debug("ScaleMsg %s", msg) + self._log.debug("NSSCALINGSTATE %s", nsr.state) if nsr.state != NetworkServiceRecordState.RUNNING: raise ScalingOperationError("Unable to perform scaling action when NS is not in running state") @@ -3525,56 +3513,6 @@ class VnfrDtsHandler(object): flags=(rwdts.Flag.SUBSCRIBER),) -class NsdRefCountDtsHandler(object): - """ The NSD Ref Count DTS handler """ - XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count" - - def __init__(self, dts, log, loop, nsm): - self._dts = dts - self._log = log - self._loop = loop - self._nsm = nsm - - self._regh = None - - @property - def regh(self): - """ Return registration handle """ - return self._regh - - @property - def nsm(self): - """ Return the NS manager instance """ - return self._nsm - - @asyncio.coroutine - def register(self): - """ Register for NSD ref count read from dts """ - - @asyncio.coroutine - def on_prepare(xact_info, action, ks_path, msg): - """ prepare callback from dts """ - xpath = ks_path.to_xpath(RwNsrYang.get_schema()) - - if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema() - path_entry = schema.keyspec_to_entry(ks_path) - nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref) - for xpath, msg in nsd_list: - xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE, - xpath=xpath, - msg=msg) - xact_info.respond_xpath(rwdts.XactRspCode.ACK) - else: - raise NetworkServiceRecordError("Not supported operation %s" % action) - - hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) - with self._dts.group_create() as group: - self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH, - handler=hdl, - flags=rwdts.Flag.PUBLISHER,) - - class NsManager(object): """ The Network Service Manager class""" def __init__(self, dts, log, loop, @@ -3591,12 +3529,17 @@ class NsManager(object): self._cloud_account_handler = cloud_account_handler self._ro_plugin_selector = ro_plugin_selector - self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + + # Intialize the set of variables for implementing Scaling RPC using REST. + self._headers = {"content-type":"application/json", "accept":"application/json"} + #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. + self._user = 'admin' + self._password = 'admin' + self._ip = 'localhost' + self._rport = 8008 + self._conf_url = "https://{ip}:{port}/api/config". \ + format(ip=self._ip, + port=self._rport) self._nsrs = {} self._nsds = {} @@ -3611,7 +3554,6 @@ class NsManager(object): self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self) self._dts_handlers = [self._nsd_dts_handler, VnfrDtsHandler(dts, log, loop, self), - NsdRefCountDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), NsrRpcDtsHandler(dts,log,loop,self), @@ -3729,67 +3671,83 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup - - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) - - @asyncio.coroutine - def get_nsr_scaling_group(): - results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) + def get_scaling_group_information(): + scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) + if output.text == None or len(output.text) == 0: + self.log.error("nsr id %s information not present", self._nsr_id) + return None + scaling_group_info = json.loads(output.text) + return scaling_group_info + + def config_scaling_group_information(scaling_group_info): + data_str = json.dumps(scaling_group_info) + self.log.debug("scaling group Info %s", data_str) + + scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response.raise_for_status() - for result in results: - res = yield from result - nsr_config = res.result + def scale_out(): + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return - for scaling_group in nsr_config.scaling_group: - if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: - break - else: - scaling_group = nsr_config.scaling_group.add() - scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref + scaling_group_present = False + if "scaling-group" in scaling_group_info["nsr:nsr"]: + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' not in scaling_group: + scaling_group['instance'] = [] + for instance in scaling_group['instance']: + if instance["id"] == int(msg.instance_id): + self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id) + return + scaling_group["instance"].append({"id": int(msg.instance_id)}) + + if not scaling_group_present: + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + + config_scaling_group_information(scaling_group_info) + return - return (nsr_config, scaling_group) + def scale_in(): + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return - @asyncio.coroutine - def update_config(nsr_config): - xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) - xml = '{}'.format(xml) - yield from self._ncclient.connect() - yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + scaling_group_present = False + instance_id_present = False + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' in scaling_group: + instance_array = scaling_group["instance"]; + for index in range(len(instance_array)): + if instance_array[index]["id"] == int(msg.instance_id): + instance_array.pop(index) + instance_id_present = True + break + + if not scaling_group_present: + self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref) + return - @asyncio.coroutine - def scale_out(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.append(instance) - yield from update_config(nsr_config) + if not instance_id_present: + self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id) + return - @asyncio.coroutine - def scale_in(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.remove(instance) - yield from update_config(nsr_config) + config_scaling_group_information(scaling_group_info) + return if action == ScalingRpcHandler.ACTION.SCALE_OUT: - self._loop.create_task(scale_out()) + self._loop.run_in_executor(None, scale_out) else: - self._loop.create_task(scale_in()) - - # Opdata based calls, disabled for now! - # if action == ScalingRpcHandler.ACTION.SCALE_OUT: - # self.scale_nsr_out( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id, - # xact) - # else: - # self.scale_nsr_in( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id) - + self._loop.run_in_executor(None, scale_in) + def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] nsr.nsr_cfg_msg= msg @@ -3814,6 +3772,7 @@ class NsManager(object): def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False): """ Create an NSR instance """ + self._log.debug("NSRMSG %s", nsr_msg) if nsr_msg.id in self._nsrs: msg = "NSR id %s already exists" % nsr_msg.id self._log.error(msg) @@ -3834,7 +3793,8 @@ class NsManager(object): nsr_msg, sdn_account_name, key_pairs, - restart_mode=restart_mode + restart_mode=restart_mode, + vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr ) self._nsrs[nsr_msg.id] = nsr nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs) @@ -3882,13 +3842,6 @@ class NsManager(object): """ Delete VNFR with the passed id""" del self._vnfrs[vnfr_id] - def get_nsd_ref(self, nsd_id): - """ Get network service descriptor for the passed nsd_id - with a reference""" - nsd = self.get_nsd(nsd_id) - nsd.ref() - return nsd - @asyncio.coroutine def get_nsr_config(self, nsd_id): xpath = "C,/nsr:ns-instance-config" @@ -3904,33 +3857,6 @@ class NsManager(object): return None - @asyncio.coroutine - def nsd_unref_by_nsr_id(self, nsr_id): - """ Unref the network service descriptor based on NSR id """ - self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id) - if nsr_id in self._nsrs: - nsr = self._nsrs[nsr_id] - - try: - nsd = self.get_nsd(nsr.nsd_id) - self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d", - nsd.id, nsr.id, nsd.ref_count) - nsd.unref() - except NetworkServiceDescriptorError: - # We store a copy of NSD in NSR and the NSD in nsd-catalog - # could be deleted - pass - - else: - self._log.error("Cannot find NSR with id %s", nsr_id) - raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id) - - @asyncio.coroutine - def nsd_unref(self, nsd_id): - """ Unref the network service descriptor associated with the id """ - nsd = self.get_nsd(nsd_id) - nsd.unref() - def get_nsd(self, nsd_id): """ Get network service descriptor for the passed nsd_id""" if nsd_id not in self._nsds: @@ -3965,7 +3891,7 @@ class NsManager(object): self.create_nsd(nsd) else: self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd) - self._nsds[nsd.id].update(nsd) + self._nsds[nsd.id].update(nsd) def delete_nsd(self, nsd_id): """ Delete the Network service descriptor with the passed id """ @@ -3973,16 +3899,6 @@ class NsManager(object): if nsd_id not in self._nsds: self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id) raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id) - - if nsd_id not in self._nsds: - self._log.debug("Cannot delete NSD id %s reference exists %s", - nsd_id, - self._nsds[nsd_id].ref_count) - raise NetworkServiceDescriptorRefCountExists( - "Cannot delete :%s, ref_count:%s", - nsd_id, - self._nsds[nsd_id].ref_count) - del self._nsds[nsd_id] def get_vnfd_config(self, xact): @@ -4017,9 +3933,6 @@ class NsManager(object): """ Update the virtual network function descriptor """ self._log.debug("Update virtual network function descriptor- %s", vnfd) - # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511 - for ivld in vnfd.internal_vld: - ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref)) if vnfd.id not in self._vnfds: self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id) @@ -4038,13 +3951,6 @@ class NsManager(object): del self._vnfds[vnfd_id] - def nsd_in_use(self, nsd_id): - """ Is the NSD with the passed id in use """ - self._log.debug("Is this NSD in use - msg:%s", nsd_id) - if nsd_id in self._nsds: - return self._nsds[nsd_id].in_use() - return False - @asyncio.coroutine def publish_nsr(self, xact, path, msg): """ Publish a NSR """ @@ -4067,29 +3973,6 @@ class NsManager(object): raise VirtualNetworkFunctionRecordError(err) self._vnfrs[vnfr_id].is_ready() - @asyncio.coroutine - def get_nsd_refcount(self, nsd_id): - """ Get the nsd_list from this NSM""" - - def nsd_refcount_xpath(nsd_id): - """ xpath for ref count entry """ - return (NsdRefCountDtsHandler.XPATH + - "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id) - - nsd_list = [] - if nsd_id is None or nsd_id == "": - for nsd in self._nsds.values(): - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() - nsd_msg.nsd_id_ref = nsd.id - nsd_msg.instance_ref_count = nsd.ref_count - nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg)) - elif nsd_id in self._nsds: - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() - nsd_msg.nsd_id_ref = self._nsds[nsd_id].id - nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count - nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg)) - - return nsd_list @asyncio.coroutine def terminate_ns(self, nsr_id, xact): @@ -4099,10 +3982,10 @@ class NsManager(object): # Terminate the instances/networks assocaited with this nw service self._log.debug("Terminating the network service %s", nsr_id) - yield from self._nsrs[nsr_id].terminate() - - # Unref the NSD - yield from self.nsd_unref_by_nsr_id(nsr_id) + try : + yield from self._nsrs[nsr_id].terminate() + except Exception as e: + self.log.exception("Failed to terminate NSR[id=%s]", nsr_id) # Unpublish the NSR record self._log.debug("Unpublishing the network service %s", nsr_id) @@ -4185,9 +4068,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) @@ -4196,6 +4076,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_IN_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) except Exception as e: self.log.exception(e) xact_info.respond_xpath( @@ -4213,9 +4095,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4224,6 +4103,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_OUT_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) except Exception as e: self.log.exception(e) xact_info.respond_xpath(