X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwnsm%2Frift%2Ftasklets%2Frwnsmtasklet%2Frwnsmtasklet.py;h=7572fe9afa1c3b8fb7a6d317cd459df4357a6ba3;hb=c2fe40c38153248d1a9f436241c65d5f43e5a900;hp=da9807a6d9c7463628ed4c612d8afab32abe147d;hpb=c55a47913f26cc903d347a5c2dbdfd629936b5d0;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index da9807a6..7572fe9a 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -35,7 +35,7 @@ from enum import Enum import gi gi.require_version('RwYang', '1.0') -gi.require_version('RwNsdYang', '1.0') +gi.require_version('NsdYang', '1.0') gi.require_version('RwDts', '1.0') gi.require_version('RwNsmYang', '1.0') gi.require_version('RwNsrYang', '1.0') @@ -51,7 +51,7 @@ from gi.repository import ( VnfrYang, RwVnfrYang, RwNsmYang, - RwsdnYang, + RwsdnalYang, RwDts as rwdts, RwTypes, ProtobufC, @@ -383,7 +383,7 @@ class VnffgRecord(object): vnfr = yield from self._nsr.fetch_vnfr(nsr_vnfr.xpath) self._log.debug("Received VNFR is %s", vnfr) - sff = RwsdnYang.VNFFGSff() + sff = RwsdnalYang.VNFFGSff() sff_list[nsr_vnfr.vnfd.id] = sff sff.name = nsr_vnfr.name sff.function_type = nsr_vnfr.vnfd.service_function_chain @@ -2011,7 +2011,7 @@ class NetworkServiceRecord(object): (member_vnfd.member_vnf_index_ref == const_vnfd.member_vnf_index): group_info = self.resolve_placement_group_cloud_construct(group) if group_info is None: - self._log.error("Could not resolve cloud-construct for placement group: %s", group.name) + self._log.info("Could not resolve cloud-construct for placement group: %s", group.name) ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name)) else: self._log.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)", @@ -2822,9 +2822,14 @@ class NsdDtsHandler(object): # Delete an NSD record self._log.debug("Deleting NSD with id %s", msg.id) if self._nsm.nsd_in_use(msg.id): - self._log.debug("Cannot delete NSD in use - %s", msg.id) - err = "Cannot delete an NSD in use - %s" % msg.id - raise NetworkServiceDescriptorRefCountExists(err) + errmsg = "Cannot delete an NSD in use - %s" % msg.id + self._log.error(errmsg) + xpath = ks_path.to_xpath(NsdYang.get_schema()) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + xpath, + errmsg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + return yield from delete_nsd_libs(msg.id) self._nsm.delete_nsd(msg.id) @@ -3010,9 +3015,18 @@ class NsrRpcDtsHandler(object): "nsr_id":str(uuid.uuid4()) }) - if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): - self._log.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip)) - + if not ('name' in rpc_ip and 'nsd_ref' in rpc_ip and + ('cloud_account' in rpc_ip or 'om_datacenter' in rpc_ip)): + errmsg = ( + "Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}". + format(rpc_ip)) + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, + errmsg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH) + return self._log.debug("start-network-service RPC input: {}".format(rpc_ip)) @@ -3022,9 +3036,6 @@ class NsrRpcDtsHandler(object): nsd_copy = self.nsm.get_nsd(rpc_ip.nsd_ref) - #if not self._manager: - # self._manager = yield from self._connect() - self._log.debug("Configuring ns-instance-config with name %s nsd-ref: %s", rpc_ip.name, rpc_ip.nsd_ref) @@ -3038,18 +3049,10 @@ class NsrRpcDtsHandler(object): ns_instance_config.nsd.from_dict(nsd_copy.msg.as_dict()) payload_dict = ns_instance_config.to_json(self._model) - #xml = ns_instance_config.to_xml_v2(self._model) - #netconf_xml = self.wrap_netconf_config_xml(xml) - #self._log.debug("Sending configure ns-instance-config xml to %s: %s", - # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS) self._log.debug("Sending configure ns-instance-config json to %s: %s", self._nsr_config_url,ns_instance_config) - #response = yield from self._manager.edit_config( - # target="running", - # config=netconf_xml, - # ) response = yield from self._loop.run_in_executor( None, self._apply_ns_instance_config, @@ -3062,13 +3065,15 @@ class NsrRpcDtsHandler(object): NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, rpc_op) except Exception as e: - self._log.error("Exception processing the " - "start-network-service: {}".format(e)) - self._log.exception(e) + errmsg = ("Exception processing the " + "start-network-service: {}".format(e)) + self._log.exception(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH, + errmsg) xact_info.respond_xpath(rwdts.XactRspCode.NACK, NsrRpcDtsHandler.EXEC_NSR_CONF_O_XPATH) - hdl_ns = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_ns_config_prepare,) with self._dts.group_create() as group: @@ -3361,43 +3366,76 @@ class NsrDtsHandler(object): fref = ProtobufC.FieldReference.alloc() fref.goto_whole_message(msg.to_pbcm()) + def send_err_msg(err_msg): + self._log.error(errmsg) + xact_info.send_error_xpath(RwTypes.RwStatus.FAILURE, + xpath, + errmsg) + xact_info.respond_xpath(rwdts.XactRspCode.NACK) + + if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE, rwdts.QueryAction.DELETE]: # if this is an NSR create if action != rwdts.QueryAction.DELETE and msg.id not in self._nsm.nsrs: # Ensure the Cloud account/datacenter has been specified if not msg.has_field("cloud_account") and not msg.has_field("om_datacenter"): - raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR") + errmsg = ("Cloud account or datacenter not specified in NS {}". + format(msg.name)) + send_err_msg(errmsg) + return # Check if nsd is specified if not msg.has_field("nsd"): - raise NsrInstantiationFailed("NSD not specified in NSR") + errmsg = ("NSD not specified in NS {}". + format(msg.name)) + send_err_msg(errmsg) + return else: nsr = self._nsm.nsrs[msg.id] if msg.has_field("nsd"): if nsr.state != NetworkServiceRecordState.RUNNING: - raise NsrVlUpdateError("Unable to update VL when NSR not in running state") + errmsg = ("Unable to update VL when NS {} not in running state". + format(msg.name)) + send_err_msg(errmsg) + return + if 'vld' not in msg.nsd or len(msg.nsd.vld) == 0: - raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined") + errmsg = ("NS config {} NSD should have atleast 1 VLD". + format(msg.name)) + send_err_msg(errmsg) + return if msg.has_field("scaling_group"): if nsr.state != NetworkServiceRecordState.RUNNING: - raise ScalingOperationError("Unable to perform scaling action when NS is not in running state") + errmsg = ("Unable to perform scaling action when NS {} not in running state". + format(msg.name)) + send_err_msg(errmsg) + return if len(msg.scaling_group) > 1: - raise ScalingOperationError("Only a single scaling group can be configured at a time") + errmsg = ("Only a single scaling group can be configured at a time for NS {}". + format(msg.name)) + send_err_msg(errmsg) + return for group_msg in msg.scaling_group: num_new_group_instances = len(group_msg.instance) if num_new_group_instances > 1: - raise ScalingOperationError("Only a single scaling instance can be modified at a time") + errmsg = ("Only a single scaling instance can be modified at a time for NS {}". + format(msg.name)) + send_err_msg(errmsg) + return elif num_new_group_instances == 1: scale_group = nsr.scaling_groups[group_msg.scaling_group_name_ref] if action in [rwdts.QueryAction.CREATE, rwdts.QueryAction.UPDATE]: if len(scale_group.instances) == scale_group.max_instance_count: - raise ScalingOperationError("Max instances for %s reached" % scale_group) + errmsg = (" Max instances for {} reached for NS {}". + format(str(scale_group), msg.name)) + send_err_msg(errmsg) + return acg.handle.prepare_complete_ok(xact_info.handle) @@ -3619,13 +3657,18 @@ class NsManager(object): self._cloud_account_handler = cloud_account_handler self._ro_plugin_selector = ro_plugin_selector - self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + # Intialize the set of variables for implementing Scaling RPC using REST. + self._headers = {"content-type":"application/json", "accept":"application/json"} + #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. + self._user = 'admin' + self._password = 'admin' + self._ip = 'localhost' + self._rport = 8008 + self._conf_url = "https://{ip}:{port}/api/config". \ + format(ip=self._ip, + port=self._rport) + self._nsrs = {} self._nsds = {} self._vnfds = {} @@ -3757,66 +3800,82 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup - - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) - - @asyncio.coroutine - def get_nsr_scaling_group(): - results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) - - for result in results: - res = yield from result - nsr_config = res.result - - for scaling_group in nsr_config.scaling_group: - if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: - break - else: - scaling_group = nsr_config.scaling_group.add() - scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref - - return (nsr_config, scaling_group) - - @asyncio.coroutine - def update_config(nsr_config): - xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) - xml = '{}'.format(xml) - yield from self._ncclient.connect() - yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") - - @asyncio.coroutine + def get_scaling_group_information(): + scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) + if output.text == None or len(output.text) == 0: + self.log.error("nsr id %s information not present", self._nsr_id) + return None + scaling_group_info = json.loads(output.text) + return scaling_group_info + + def config_scaling_group_information(scaling_group_info): + data_str = json.dumps(scaling_group_info) + self.log.debug("scaling group Info %s", data_str) + + scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response.raise_for_status() + def scale_out(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.append(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_present = False + if "scaling-group" in scaling_group_info["nsr:nsr"]: + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' not in scaling_group: + scaling_group['instance'] = [] + for instance in scaling_group['instance']: + if instance["id"] == int(msg.instance_id): + self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id) + return + scaling_group["instance"].append({"id": int(msg.instance_id)}) + + if not scaling_group_present: + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + + config_scaling_group_information(scaling_group_info) + return - @asyncio.coroutine def scale_in(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.remove(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + scaling_group_present = False + instance_id_present = False + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' in scaling_group: + instance_array = scaling_group["instance"]; + for index in range(len(instance_array)): + if instance_array[index]["id"] == int(msg.instance_id): + instance_array.pop(index) + instance_id_present = True + break + + if not scaling_group_present: + self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref) + return + + if not instance_id_present: + self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id) + return + + config_scaling_group_information(scaling_group_info) + return if action == ScalingRpcHandler.ACTION.SCALE_OUT: - self._loop.create_task(scale_out()) + self._loop.run_in_executor(None, scale_out) else: - self._loop.create_task(scale_in()) - - # Opdata based calls, disabled for now! - # if action == ScalingRpcHandler.ACTION.SCALE_OUT: - # self.scale_nsr_out( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id, - # xact) - # else: - # self.scale_nsr_in( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id) + self._loop.run_in_executor(None, scale_in) def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] @@ -4215,9 +4274,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) @@ -4226,6 +4282,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_IN_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) except Exception as e: self.log.exception(e) xact_info.respond_xpath( @@ -4243,9 +4301,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4254,6 +4309,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_OUT_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) except Exception as e: self.log.exception(e) xact_info.respond_xpath(