X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=85b31a5d26c9f4e9365e37cb0989431197cc407c;hb=6c0af98a662f94fd796c77918953ce5ca8a33d45;hp=9877ae9ca11725588ce4d10c1a5e13a4b783ae73;hpb=d0d7b8a40207d7c67e72c65879226c441e443d05;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 9877ae9c..85b31a5d 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -1,4 +1,4 @@ -# +# # Copyright 2016 RIFT.IO Inc # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -51,7 +51,7 @@ from gi.repository import ( VnfrYang, RwVnfrYang, RwNsmYang, - RwsdnYang, + RwsdnalYang, RwDts as rwdts, RwTypes, ProtobufC, @@ -220,7 +220,6 @@ class VnffgRecord(object): if self._vnffgr_state == VnffgRecordState.INIT: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -229,7 +228,6 @@ class VnffgRecord(object): vnffgr = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr.from_dict(vnffgr_dict) elif self._vnffgr_state == VnffgRecordState.TERMINATED: vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -243,7 +241,6 @@ class VnffgRecord(object): self._log.exception("Fetching VNFFGR for VNFFG with id %s failed", self._vnffgr_id) self._vnffgr_state = VnffgRecordState.FAILED vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -257,7 +254,6 @@ class VnffgRecord(object): def vnffgr_create_msg(self): """ Virtual Link Record message for Creating VLR in VNS """ vnffgr_dict = {"id": self._vnffgr_id, - "nsd_id": self._nsr.nsd_id, "vnffgd_id_ref": self._vnffgd_msg.id, "vnffgd_name_ref": self._vnffgd_msg.name, "sdn_account": self._sdn_account_name, @@ -387,7 +383,7 @@ class VnffgRecord(object): vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) - sff = RwsdnYang.VNFFGSff() + sff = RwsdnalYang.VNFFGSff() sff_list[nsr_vnfr.vnfd.id] = sff sff.name = nsr_vnfr.name sff.function_type = nsr_vnfr.vnfd.service_function_chain @@ -460,6 +456,7 @@ class VnffgRecord(object): class VirtualLinkRecord(object): """ Virtual Link Records class""" + XPATH = "D,/vlr:vlr-catalog/vlr:vlr" @staticmethod @asyncio.coroutine def create_record(dts, log, loop, nsr_name, vld_msg, cloud_account_name, om_datacenter, ip_profile, nsr_id, restart_mode=False): @@ -515,7 +512,8 @@ class VirtualLinkRecord(object): self._vlr_id = str(uuid.uuid4()) self._state = VlRecordState.INIT self._prev_state = None - + self._create_time = int(time.time()) + @property def xpath(self): """ path for this object """ @@ -608,6 +606,7 @@ class VirtualLinkRecord(object): "nsr_id_ref": self._nsr_id, "vld_ref": self.vld_msg.id, "name": self.name, + "create_time": self._create_time, "cloud_account": self.cloud_account_name, "om_datacenter": self.om_datacenter_name, } @@ -645,7 +644,6 @@ class VirtualLinkRecord(object): @asyncio.coroutine def instantiate(self): """ Instantiate this VL """ - self._log.debug("Instaniating VLR key %s, vld %s", self.xpath, self._vld_msg) vlr = None @@ -794,6 +792,7 @@ class VirtualNetworkFunctionRecord(object): self._group_instance_id = group_instance_id self._placement_groups = placement_groups self._config_status = NsrYang.ConfigStates.INIT + self._create_time = int(time.time()) self._prev_state = VnfRecordState.INIT self._state = VnfRecordState.INIT @@ -950,7 +949,6 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict = { "id": self.id, "nsr_id_ref": self._nsr_id, - "vnfd_ref": self.vnfd.id, "name": self.name, "cloud_account": self._cloud_account_name, "om_datacenter": self._om_datacenter_name, @@ -959,6 +957,8 @@ class VirtualNetworkFunctionRecord(object): vnfr_dict.update(vnfd_copy_dict) vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict(vnfr_dict) + vnfr.vnfd = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict(self.vnfd.as_dict(), + ignore_missing_keys=True) vnfr.member_vnf_index_ref = self.member_vnf_index vnfr.vnf_configuration.from_dict(self._vnfd.vnf_configuration.as_dict()) @@ -1072,6 +1072,9 @@ class VirtualNetworkFunctionRecord(object): cpr = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint() cpr.name = conn_p.name cpr.type_yang = conn_p.type_yang + if conn_p.has_field('port_security_enabled'): + cpr.port_security_enabled = conn_p.port_security_enabled + vlr_ref = find_vlr_for_cp(conn_p) if vlr_ref is None: msg = "Failed to find VLR for cp = %s" % conn_p.name @@ -1082,7 +1085,7 @@ class VirtualNetworkFunctionRecord(object): cpr.vlr_ref = vlr_ref.id self.vnfr_msg.connection_point.append(cpr) self._log.debug("Connection point [%s] added, vnf id=%s vnfd id=%s", - cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd_ref) + cpr, self.vnfr_msg.id, self.vnfr_msg.vnfd.id) if not self.restart_mode: yield from self._dts.query_create(self.xpath, @@ -1231,7 +1234,8 @@ class NetworkServiceRecord(object): """ Network service record """ XPATH = "D,/nsr:ns-instance-opdata/nsr:nsr" - def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False): + def __init__(self, dts, log, loop, nsm, nsm_plugin, nsr_cfg_msg, sdn_account_name, key_pairs, restart_mode=False, + vlr_handler=None): self._dts = dts self._log = log self._loop = loop @@ -1239,6 +1243,7 @@ class NetworkServiceRecord(object): self._nsr_cfg_msg = nsr_cfg_msg self._nsm_plugin = nsm_plugin self._sdn_account_name = sdn_account_name + self._vlr_handler = vlr_handler self._nsd = None self._nsr_msg = None @@ -1261,6 +1266,7 @@ class NetworkServiceRecord(object): self._is_active = False self._vl_phase_completed = False self._vnf_phase_completed = False + self.vlr_uptime_tasks = {} # Initalise the state to init @@ -1290,6 +1296,7 @@ class NetworkServiceRecord(object): self._vnf_phase_completed = True self._op_status.set_state(state) + self._nsm_plugin.set_state(self.id, state) @property def id(self): @@ -1452,6 +1459,21 @@ class NetworkServiceRecord(object): for vlr in self._vlrs: yield from self.nsm_plugin.instantiate_vl(self, vlr) vlr.state = VlRecordState.ACTIVE + self.vlr_uptime_tasks[vlr.id] = self._loop.create_task(self.vlr_uptime_update(vlr)) + + + def vlr_uptime_update(self, vlr): + try: + + vlr_ = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.from_dict({'id': vlr.id}) + while True: + vlr_.uptime = int(time.time()) - vlr._create_time + yield from self._vlr_handler.update(None, VirtualLinkRecord.vlr_xpath(vlr), vlr_) + yield from asyncio.sleep(2, loop=self._loop) + except asyncio.CancelledError: + self._log.debug("Received cancellation request for vlr_uptime_update task") + yield from self._vlr_handler.delete(None, VirtualLinkRecord.vlr_xpath(vlr)) + @asyncio.coroutine def create(self, config_xact): @@ -1807,19 +1829,20 @@ class NetworkServiceRecord(object): self._vnffgrs[vnffgr.id] = vnffgr def resolve_vld_ip_profile(self, nsd_msg, vld): + self._log.debug("Receieved ip profile ref is %s",vld.ip_profile_ref) if not vld.has_field('ip_profile_ref'): return None - profile = [ profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref ] + profile = [profile for profile in nsd_msg.ip_profiles if profile.name == vld.ip_profile_ref] return profile[0] if profile else None @asyncio.coroutine def _create_vls(self, vld, cloud_account,om_datacenter): """Create a VLR in the cloud account specified using the given VLD - + Args: vld : VLD yang obj cloud_account : Cloud account name - + Returns: VirtualLinkRecord """ @@ -1887,6 +1910,7 @@ class NetworkServiceRecord(object): """ This function creates VLs for every VLD in the NSD associated with this NSR""" for vld in self.nsd_msg.vld: + self._log.debug("Found vld %s in nsr id %s", vld, self.id) cloud_account_list = self._extract_cloud_accounts_for_vl(vld) for cloud_account,om_datacenter in cloud_account_list: @@ -1913,8 +1937,8 @@ class NetworkServiceRecord(object): if vlr is None: cloud_account_list = self._extract_cloud_accounts_for_vl(vld) - for account in cloud_account_list: - vlr = yield from self._create_vls(vld, account) + for account,om_datacenter in cloud_account_list: + vlr = yield from self._create_vls(vld, account,om_datacenter) self._vlrs.append(vlr) vlr.state = VlRecordState.INSTANTIATION_PENDING @@ -1987,7 +2011,7 @@ class NetworkServiceRecord(object): (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: - self._log.error("Could not resolve cloud-construct for placement group: %s", group.name) + self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name)) else: self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)", @@ -2002,10 +2026,11 @@ class NetworkServiceRecord(object): # Fetch the VNFD associated with this VNF placement_groups = self.get_placement_groups(vnfd_msg, const_vnfd) self._log.info("Cloud Account for VNF %d is %s",const_vnfd.member_vnf_index,cloud_account_name) - self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s", + self._log.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s, restart mode self.restart_mode %s", vnfd_msg.name, const_vnfd.member_vnf_index, - [ group.name for group in placement_groups]) + [ group.name for group in placement_groups], + self.restart_mode) vnfr = yield from VirtualNetworkFunctionRecord.create_record(self._dts, self._log, self._loop, @@ -2398,6 +2423,8 @@ class NetworkServiceRecord(object): for vlr in self.vlrs: yield from self.nsm_plugin.terminate_vl(vlr) vlr.state = VlRecordState.TERMINATED + if vlr.id in self.vlr_uptime_tasks: + self.vlr_uptime_tasks[vlr.id].cancel() self._log.debug("Terminating network service id %s", self.id) @@ -2471,6 +2498,7 @@ class NetworkServiceRecord(object): nsr.config_status = self.map_config_status() nsr.config_status_details = self._config_status_details nsr.create_time = self._create_time + nsr.uptime = int(time.time()) - self._create_time for cfg_prim in self.nsd_msg.service_primitive: cfg_prim = NsrYang.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive.from_dict( @@ -2981,22 +3009,22 @@ class NsrRpcDtsHandler(object): rpc_op = NsrYang.YangOutput_Nsr_StartNetworkService.from_dict({ "nsr_id":str(uuid.uuid4()) }) - + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) - + self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) try: # Add used value to the pool self._log.debug("RPC output: {}".format(rpc_op)) - + nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref) #if not self._manager: # self._manager = yield from self._connect() - + self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", rpc_ip.name, rpc_ip.nsd_ref) @@ -3201,10 +3229,10 @@ class NsrDtsHandler(object): def get_nsr_key_pairs(dts_member_reg, xact): key_pairs = {} for instance_cfg, keyspec in dts_member_reg.get_xact_elements(xact, include_keyspec=True): - self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) + self._log.debug("Key pair received is {} KS: {}".format(instance_cfg, keyspec)) xpath = keyspec.to_xpath(RwNsrYang.get_schema()) key_pairs[instance_cfg.name] = instance_cfg - return key_pairs + return key_pairs def on_apply(dts, acg, xact, action, scratch): """Apply the configuration""" @@ -3591,13 +3619,18 @@ class NsManager(object): self._cloud_account_handler = cloud_account_handler self._ro_plugin_selector = ro_plugin_selector - self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + # Intialize the set of variables for implementing Scaling RPC using REST. + self._headers = {"content-type":"application/json", "accept":"application/json"} + #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. + self._user = 'admin' + self._password = 'admin' + self._ip = 'localhost' + self._rport = 8008 + self._conf_url = "https://{ip}:{port}/api/config". \ + format(ip=self._ip, + port=self._rport) + self._nsrs = {} self._nsds = {} self._vnfds = {} @@ -3729,67 +3762,83 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup - - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) - - @asyncio.coroutine - def get_nsr_scaling_group(): - results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) - - for result in results: - res = yield from result - nsr_config = res.result - - for scaling_group in nsr_config.scaling_group: - if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: - break - else: - scaling_group = nsr_config.scaling_group.add() - scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref - - return (nsr_config, scaling_group) - - @asyncio.coroutine - def update_config(nsr_config): - xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) - xml = '{}'.format(xml) - yield from self._ncclient.connect() - yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") + def get_scaling_group_information(): + scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) + if output.text == None or len(output.text) == 0: + self.log.error("nsr id %s information not present", self._nsr_id) + return None + scaling_group_info = json.loads(output.text) + return scaling_group_info + + def config_scaling_group_information(scaling_group_info): + data_str = json.dumps(scaling_group_info) + self.log.debug("scaling group Info %s", data_str) - @asyncio.coroutine + scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response.raise_for_status() + def scale_out(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.append(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_present = False + if "scaling-group" in scaling_group_info["nsr:nsr"]: + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' not in scaling_group: + scaling_group['instance'] = [] + for instance in scaling_group['instance']: + if instance["id"] == int(msg.instance_id): + self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id) + return + scaling_group["instance"].append({"id": int(msg.instance_id)}) + + if not scaling_group_present: + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + + config_scaling_group_information(scaling_group_info) + return - @asyncio.coroutine def scale_in(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.remove(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + scaling_group_present = False + instance_id_present = False + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' in scaling_group: + instance_array = scaling_group["instance"]; + for index in range(len(instance_array)): + if instance_array[index]["id"] == int(msg.instance_id): + instance_array.pop(index) + instance_id_present = True + break + + if not scaling_group_present: + self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref) + return + + if not instance_id_present: + self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id) + return + + config_scaling_group_information(scaling_group_info) + return if action == ScalingRpcHandler.ACTION.SCALE_OUT: - self._loop.create_task(scale_out()) + self._loop.run_in_executor(None, scale_out) else: - self._loop.create_task(scale_in()) - - # Opdata based calls, disabled for now! - # if action == ScalingRpcHandler.ACTION.SCALE_OUT: - # self.scale_nsr_out( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id, - # xact) - # else: - # self.scale_nsr_in( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id) - + self._loop.run_in_executor(None, scale_in) + def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] nsr.nsr_cfg_msg= msg @@ -3819,9 +3868,10 @@ class NsManager(object): self._log.error(msg) raise NetworkServiceRecordError(msg) - self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s", + self._log.info("Create NetworkServiceRecord nsr id %s from nsd_id %s, restart mode %s", nsr_msg.id, - nsr_msg.nsd.id) + nsr_msg.nsd.id, + restart_mode) nsm_plugin = self._ro_plugin_selector.ro_plugin sdn_account_name = self._cloud_account_handler.get_cloud_account_sdn_name(nsr_msg.cloud_account) @@ -3834,7 +3884,8 @@ class NsManager(object): nsr_msg, sdn_account_name, key_pairs, - restart_mode=restart_mode + restart_mode=restart_mode, + vlr_handler=self._ro_plugin_selector._records_publisher._vlr_pub_hdlr ) self._nsrs[nsr_msg.id] = nsr nsm_plugin.create_nsr(nsr_msg, nsr_msg.nsd, key_pairs) @@ -3965,7 +4016,7 @@ class NsManager(object): self.create_nsd(nsd) else: self._log.debug("Updating NSD id = %s, nsd = %s", nsd.id, nsd) - self._nsds[nsd.id].update(nsd) + self._nsds[nsd.id].update(nsd) def delete_nsd(self, nsd_id): """ Delete the Network service descriptor with the passed id """ @@ -4017,9 +4068,6 @@ class NsManager(object): """ Update the virtual network function descriptor """ self._log.debug("Update virtual network function descriptor- %s", vnfd) - # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511 - for ivld in vnfd.internal_vld: - ivld.internal_connection_point_ref = list(set(ivld.internal_connection_point_ref)) if vnfd.id not in self._vnfds: self._log.debug("No VNFD found - creating VNFD id = %s", vnfd.id) @@ -4099,7 +4147,10 @@ class NsManager(object): # Terminate the instances/networks assocaited with this nw service self._log.debug("Terminating the network service %s", nsr_id) - yield from self._nsrs[nsr_id].terminate() + try : + yield from self._nsrs[nsr_id].terminate() + except Exception as e: + self.log.exception("Failed to terminate NSR[id=%s]", nsr_id) # Unref the NSD yield from self.nsd_unref_by_nsr_id(nsr_id) @@ -4185,9 +4236,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) @@ -4196,6 +4244,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_IN_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) except Exception as e: self.log.exception(e) xact_info.respond_xpath( @@ -4213,9 +4263,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4224,6 +4271,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_OUT_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) except Exception as e: self.log.exception(e) xact_info.respond_xpath(