X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=7cbc194b99dd568c7c62dd46bc080473b7756556;hb=375ebf79a245a2140c3eeaf82b1843d255c149fd;hp=853495214d49305ab1e382a0a06757d9c3fce357;hpb=c33673480c15d6e64402b547563640b2025a3cd6;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 85349521..7cbc194b 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -51,7 +51,7 @@ from gi.repository import ( VnfrYang, RwVnfrYang, RwNsmYang, - RwsdnYang, + RwsdnalYang, RwDts as rwdts, RwTypes, ProtobufC, @@ -111,16 +111,10 @@ class NetworkServiceDescriptorNotFound(Exception): pass -class NetworkServiceDescriptorRefCountExists(Exception): +class NetworkServiceDescriptorNotFound(Exception): """ Network Service Descriptor reference count exists """ pass - -class NetworkServiceDescriptorUnrefError(Exception): - """ Failed to unref a network service descriptor """ - pass - - class NsrInstantiationFailed(Exception): """ Failed to instantiate network service """ pass @@ -383,7 +377,7 @@ class VnffgRecord(object): vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) - sff = RwsdnYang.VNFFGSff() + sff = RwsdnalYang.VNFFGSff() sff_list[nsr_vnfr.vnfd.id] = sff sff.name = nsr_vnfr.name sff.function_type = nsr_vnfr.vnfd.service_function_chain @@ -957,8 +951,8 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict.update(vnfd_copy_dict) vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) - - vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict()) + vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), + ignore_missing_keys=True) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -1072,6 +1066,9 @@ class VirtualNetworkFunctionRecord(object): cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang + if conn_p.has_field('port_security_enabled'): + cpr.port_security_enabled = conn_p.port_security_enabled + vlr_ref = find_vlr_for_cp(conn_p) if vlr_ref is None: msg = "Failed to find VLR for cp = %s" % conn_p.name @@ -1293,6 +1290,7 @@ class NetworkServiceRecord(object): self._vnf_phase_completed = True self._op_status.set_state(state) + self._nsm_plugin.set_state(self.id, state) @property def id(self): @@ -1455,7 +1453,6 @@ class NetworkServiceRecord(object): for vlr in self._vlrs: yield from self.nsm_plugin.instantiate_vl(self, vlr) vlr.state = VlRecordState.ACTIVE - self.vlr_uptime_tasks[vlr.id] = self._loop.create_task(self.vlr_uptime_update(vlr)) def vlr_uptime_update(self, vlr): @@ -1697,7 +1694,6 @@ class NetworkServiceRecord(object): vnfr = yield from self.create_vnf_record(vnfd_msg, const_vnfd_msg, cloud_account_name, om_datacenter_name, group_name, index) scale_instance.add_vnfr(vnfr) vnfrs.append(vnfr) - return vnfrs @asyncio.coroutine @@ -1718,7 +1714,8 @@ class NetworkServiceRecord(object): format(group.name, index)) scale_instance.operational_status = "failed" else: - yield from self.instantiate_vnfs(vnfrs) + yield from self.instantiate_vnfs(vnfrs, scaleout=True) + except Exception as e: self._log.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}". @@ -2007,7 +2004,7 @@ class NetworkServiceRecord(object): (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: - self._log.error("Could not resolve cloud-construct for placement group: %s", group.name) + self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name)) else: self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)", @@ -2090,14 +2087,14 @@ class NetworkServiceRecord(object): return vnfr @asyncio.coroutine - def instantiate_vnfs(self, vnfrs): + def instantiate_vnfs(self, vnfrs, scaleout=False): """ This function instantiates VNFs for every VNF in this Network Service """ self._log.debug("Instantiating %u VNFs in NS %s", len(vnfrs), self.id) for vnf in vnfrs: self._log.debug("Instantiating VNF: %s in NS %s", vnf, self.id) - yield from self.nsm_plugin.instantiate_vnf(self, vnf) + yield from self.nsm_plugin.instantiate_vnf(self, vnf,scaleout) @asyncio.coroutine def instantiate_vnffgs(self): @@ -2235,14 +2232,6 @@ class NetworkServiceRecord(object): # Find the NSD self._nsd = self._nsr_cfg_msg.nsd - try: - # Update ref count if nsd present in catalog - self._nsm.get_nsd_ref(self.nsd_id) - - except NetworkServiceDescriptorError: - # This could be an NSD not in the nsd-catalog - pass - # Merge any config and initial config primitive values self.config_store.merge_nsd_config(self.nsd_msg) self._log.debug("Merged NSD: {}".format(self.nsd_msg.as_dict())) @@ -2346,9 +2335,12 @@ class NetworkServiceRecord(object): def on_instantiate_done(fut): # If the do_instantiate fails, then publish NSR with failed result - if fut.exception() is not None: - self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(fut.exception())) - self._loop.create_task(self.instantiation_failed(failed_reason=str(fut.exception()))) + e = fut.exception() + if e is not None: + import traceback, sys + print(traceback.format_exception(None,e, e.__traceback__), file=sys.stderr, flush=True) + self._log.error("NSR instantiation failed for NSR id %s: %s", self.id, str(e)) + self._loop.create_task(self.instantiation_failed(failed_reason=str(e))) instantiate_task = self._loop.create_task(do_instantiate()) instantiate_task.add_done_callback(on_instantiate_done) @@ -2698,8 +2690,6 @@ class NetworkServiceDescriptor(object): self._loop = loop self._nsd = nsd - self._ref_count = 0 - self._nsm = nsm @property @@ -2712,28 +2702,6 @@ class NetworkServiceDescriptor(object): """ Returns name of nsd """ return self._nsd.name - @property - def ref_count(self): - """ Returns reference count""" - return self._ref_count - - def in_use(self): - """ Returns whether nsd is in use or not """ - return True if self.ref_count > 0 else False - - def ref(self): - """ Take a reference on this object """ - self._ref_count += 1 - - def unref(self): - """ Release reference on this object """ - if self.ref_count < 1: - msg = ("Unref on a NSD object - nsd id %s, ref_count = %s" % - (self.id, self.ref_count)) - self._log.critical(msg) - raise NetworkServiceDescriptorError(msg) - self._ref_count -= 1 - @property def msg(self): """ Return the message associated with this NetworkServiceDescriptor""" @@ -2816,11 +2784,6 @@ class NsdDtsHandler(object): if fref.is_field_deleted(): # Delete an NSD record self._log.debug("Deleting NSD with id %s", msg.id) - if self._nsm.nsd_in_use(msg.id): - self._log.debug("Cannot delete NSD in use - %s", msg.id) - err = "Cannot delete an NSD in use - %s" % msg.id - raise NetworkServiceDescriptorRefCountExists(err) - yield from delete_nsd_libs(msg.id) self._nsm.delete_nsd(msg.id) else: @@ -3377,6 +3340,8 @@ class NsrDtsHandler(object): raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined") if msg.has_field("scaling_group"): + self._log.debug("ScaleMsg %s", msg) + self._log.debug("NSSCALINGSTATE %s", nsr.state) if nsr.state != NetworkServiceRecordState.RUNNING: raise ScalingOperationError("Unable to perform scaling action when NS is not in running state") @@ -3548,56 +3513,6 @@ class VnfrDtsHandler(object): flags=(rwdts.Flag.SUBSCRIBER),) -class NsdRefCountDtsHandler(object): - """ The NSD Ref Count DTS handler """ - XPATH = "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count" - - def __init__(self, dts, log, loop, nsm): - self._dts = dts - self._log = log - self._loop = loop - self._nsm = nsm - - self._regh = None - - @property - def regh(self): - """ Return registration handle """ - return self._regh - - @property - def nsm(self): - """ Return the NS manager instance """ - return self._nsm - - @asyncio.coroutine - def register(self): - """ Register for NSD ref count read from dts """ - - @asyncio.coroutine - def on_prepare(xact_info, action, ks_path, msg): - """ prepare callback from dts """ - xpath = ks_path.to_xpath(RwNsrYang.get_schema()) - - if action == rwdts.QueryAction.READ: - schema = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount.schema() - path_entry = schema.keyspec_to_entry(ks_path) - nsd_list = yield from self._nsm.get_nsd_refcount(path_entry.key00.nsd_id_ref) - for xpath, msg in nsd_list: - xact_info.respond_xpath(rsp_code=rwdts.XactRspCode.MORE, - xpath=xpath, - msg=msg) - xact_info.respond_xpath(rwdts.XactRspCode.ACK) - else: - raise NetworkServiceRecordError("Not supported operation %s" % action) - - hdl = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,) - with self._dts.group_create() as group: - self._regh = group.register(xpath=NsdRefCountDtsHandler.XPATH, - handler=hdl, - flags=rwdts.Flag.PUBLISHER,) - - class NsManager(object): """ The Network Service Manager class""" def __init__(self, dts, log, loop, @@ -3614,12 +3529,17 @@ class NsManager(object): self._cloud_account_handler = cloud_account_handler self._ro_plugin_selector = ro_plugin_selector - self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + + # Intialize the set of variables for implementing Scaling RPC using REST. + self._headers = {"content-type":"application/json", "accept":"application/json"} + #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. + self._user = 'admin' + self._password = 'admin' + self._ip = 'localhost' + self._rport = 8008 + self._conf_url = "https://{ip}:{port}/api/config". \ + format(ip=self._ip, + port=self._rport) self._nsrs = {} self._nsds = {} @@ -3634,7 +3554,6 @@ class NsManager(object): self._vnfd_dts_handler = VnfdDtsHandler(dts, log, loop, self) self._dts_handlers = [self._nsd_dts_handler, VnfrDtsHandler(dts, log, loop, self), - NsdRefCountDtsHandler(dts, log, loop, self), NsrDtsHandler(dts, log, loop, self), ScalingRpcHandler(log, dts, loop, self.scale_rpc_callback), NsrRpcDtsHandler(dts,log,loop,self), @@ -3752,66 +3671,82 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup - - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) + def get_scaling_group_information(): + scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) + if output.text == None or len(output.text) == 0: + self.log.error("nsr id %s information not present", self._nsr_id) + return None + scaling_group_info = json.loads(output.text) + return scaling_group_info + + def config_scaling_group_information(scaling_group_info): + data_str = json.dumps(scaling_group_info) + self.log.debug("scaling group Info %s", data_str) + + scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response.raise_for_status() - @asyncio.coroutine - def get_nsr_scaling_group(): - results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) - - for result in results: - res = yield from result - nsr_config = res.result + def scale_out(): + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return - for scaling_group in nsr_config.scaling_group: - if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: - break - else: - scaling_group = nsr_config.scaling_group.add() - scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref + scaling_group_present = False + if "scaling-group" in scaling_group_info["nsr:nsr"]: + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' not in scaling_group: + scaling_group['instance'] = [] + for instance in scaling_group['instance']: + if instance["id"] == int(msg.instance_id): + self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id) + return + scaling_group["instance"].append({"id": int(msg.instance_id)}) + + if not scaling_group_present: + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + + config_scaling_group_information(scaling_group_info) + return - return (nsr_config, scaling_group) + def scale_in(): + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return - @asyncio.coroutine - def update_config(nsr_config): - xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) - xml = '{}'.format(xml) - yield from self._ncclient.connect() - yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + scaling_group_present = False + instance_id_present = False + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' in scaling_group: + instance_array = scaling_group["instance"]; + for index in range(len(instance_array)): + if instance_array[index]["id"] == int(msg.instance_id): + instance_array.pop(index) + instance_id_present = True + break + + if not scaling_group_present: + self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref) + return - @asyncio.coroutine - def scale_out(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.append(instance) - yield from update_config(nsr_config) + if not instance_id_present: + self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id) + return - @asyncio.coroutine - def scale_in(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.remove(instance) - yield from update_config(nsr_config) + config_scaling_group_information(scaling_group_info) + return if action == ScalingRpcHandler.ACTION.SCALE_OUT: - self._loop.create_task(scale_out()) + self._loop.run_in_executor(None, scale_out) else: - self._loop.create_task(scale_in()) - - # Opdata based calls, disabled for now! - # if action == ScalingRpcHandler.ACTION.SCALE_OUT: - # self.scale_nsr_out( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id, - # xact) - # else: - # self.scale_nsr_in( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id) + self._loop.run_in_executor(None, scale_in) def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] @@ -3837,6 +3772,7 @@ class NsManager(object): def create_nsr(self, nsr_msg, key_pairs=None,restart_mode=False): """ Create an NSR instance """ + self._log.debug("NSRMSG %s", nsr_msg) if nsr_msg.id in self._nsrs: msg = "NSR id %s already exists" % nsr_msg.id self._log.error(msg) @@ -3906,13 +3842,6 @@ class NsManager(object): """ Delete VNFR with the passed id""" del self._vnfrs[vnfr_id] - def get_nsd_ref(self, nsd_id): - """ Get network service descriptor for the passed nsd_id - with a reference""" - nsd = self.get_nsd(nsd_id) - nsd.ref() - return nsd - @asyncio.coroutine def get_nsr_config(self, nsd_id): xpath = "C,/nsr:ns-instance-config" @@ -3928,33 +3857,6 @@ class NsManager(object): return None - @asyncio.coroutine - def nsd_unref_by_nsr_id(self, nsr_id): - """ Unref the network service descriptor based on NSR id """ - self._log.debug("NSR Unref called for Nsr Id:%s", nsr_id) - if nsr_id in self._nsrs: - nsr = self._nsrs[nsr_id] - - try: - nsd = self.get_nsd(nsr.nsd_id) - self._log.debug("Releasing ref on NSD %s held by NSR %s - Curr %d", - nsd.id, nsr.id, nsd.ref_count) - nsd.unref() - except NetworkServiceDescriptorError: - # We store a copy of NSD in NSR and the NSD in nsd-catalog - # could be deleted - pass - - else: - self._log.error("Cannot find NSR with id %s", nsr_id) - raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id) - - @asyncio.coroutine - def nsd_unref(self, nsd_id): - """ Unref the network service descriptor associated with the id """ - nsd = self.get_nsd(nsd_id) - nsd.unref() - def get_nsd(self, nsd_id): """ Get network service descriptor for the passed nsd_id""" if nsd_id not in self._nsds: @@ -3997,16 +3899,6 @@ class NsManager(object): if nsd_id not in self._nsds: self._log.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id) raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id) - - if nsd_id not in self._nsds: - self._log.debug("Cannot delete NSD id %s reference exists %s", - nsd_id, - self._nsds[nsd_id].ref_count) - raise NetworkServiceDescriptorRefCountExists( - "Cannot delete :%s, ref_count:%s", - nsd_id, - self._nsds[nsd_id].ref_count) - del self._nsds[nsd_id] def get_vnfd_config(self, xact): @@ -4059,13 +3951,6 @@ class NsManager(object): del self._vnfds[vnfd_id] - def nsd_in_use(self, nsd_id): - """ Is the NSD with the passed id in use """ - self._log.debug("Is this NSD in use - msg:%s", nsd_id) - if nsd_id in self._nsds: - return self._nsds[nsd_id].in_use() - return False - @asyncio.coroutine def publish_nsr(self, xact, path, msg): """ Publish a NSR """ @@ -4088,29 +3973,6 @@ class NsManager(object): raise VirtualNetworkFunctionRecordError(err) self._vnfrs[vnfr_id].is_ready() - @asyncio.coroutine - def get_nsd_refcount(self, nsd_id): - """ Get the nsd_list from this NSM""" - - def nsd_refcount_xpath(nsd_id): - """ xpath for ref count entry """ - return (NsdRefCountDtsHandler.XPATH + - "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id) - - nsd_list = [] - if nsd_id is None or nsd_id == "": - for nsd in self._nsds.values(): - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() - nsd_msg.nsd_id_ref = nsd.id - nsd_msg.instance_ref_count = nsd.ref_count - nsd_list.append((nsd_refcount_xpath(nsd.id), nsd_msg)) - elif nsd_id in self._nsds: - nsd_msg = RwNsrYang.YangData_Nsr_NsInstanceOpdata_NsdRefCount() - nsd_msg.nsd_id_ref = self._nsds[nsd_id].id - nsd_msg.instance_ref_count = self._nsds[nsd_id].ref_count - nsd_list.append((nsd_refcount_xpath(nsd_id), nsd_msg)) - - return nsd_list @asyncio.coroutine def terminate_ns(self, nsr_id, xact): @@ -4125,9 +3987,6 @@ class NsManager(object): except Exception as e: self.log.exception("Failed to terminate NSR[id=%s]", nsr_id) - # Unref the NSD - yield from self.nsd_unref_by_nsr_id(nsr_id) - # Unpublish the NSR record self._log.debug("Unpublishing the network service %s", nsr_id) yield from self._nsrs[nsr_id].unpublish(xact) @@ -4209,9 +4068,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) @@ -4220,6 +4076,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_IN_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) except Exception as e: self.log.exception(e) xact_info.respond_xpath( @@ -4237,9 +4095,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4248,6 +4103,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_OUT_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) except Exception as e: self.log.exception(e) xact_info.respond_xpath(