From 6c0af98a662f94fd796c77918953ce5ca8a33d45 Mon Sep 17 00:00:00 2001 From: Jithin Jose Date: Fri, 7 Apr 2017 02:48:57 -0400 Subject: [PATCH] Implement scaling RPCs using REST rather than netconf Signed-off-by: Jithin Jose --- .../tasklets/rwnsmtasklet/rwnsmtasklet.py | 151 ++++++++++-------- 1 file changed, 85 insertions(+), 66 deletions(-) diff --git a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py index 69fac685..85b31a5d 100755 --- a/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py +++ b/rwlaunchpad/plugins/rwnsm/rift/tasklets/rwnsmtasklet/rwnsmtasklet.py @@ -3619,13 +3619,18 @@ class NsManager(object): self._cloud_account_handler = cloud_account_handler self._ro_plugin_selector = ro_plugin_selector - self._ncclient = rift.mano.ncclient.NcClient( - host="127.0.0.1", - port=2022, - username="admin", - password="admin", - loop=self._loop) + # Intialize the set of variables for implementing Scaling RPC using REST. + self._headers = {"content-type":"application/json", "accept":"application/json"} + #This will break when we have rbac in the rift code and admin user password is changed or admin it self is removed. + self._user = 'admin' + self._password = 'admin' + self._ip = 'localhost' + self._rport = 8008 + self._conf_url = "https://{ip}:{port}/api/config". \ + format(ip=self._ip, + port=self._rport) + self._nsrs = {} self._nsds = {} self._vnfds = {} @@ -3757,66 +3762,82 @@ class NsManager(object): msg : RPC input action : Scaling Action """ - ScalingGroupInstance = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance - ScalingGroup = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup - - xpath = ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format( - msg.nsr_id_ref) - instance = ScalingGroupInstance.from_dict({"id": msg.instance_id}) - - @asyncio.coroutine - def get_nsr_scaling_group(): - results = yield from self._dts.query_read(xpath, rwdts.XactFlag.MERGE) - - for result in results: - res = yield from result - nsr_config = res.result - - for scaling_group in nsr_config.scaling_group: - if scaling_group.scaling_group_name_ref == msg.scaling_group_name_ref: - break - else: - scaling_group = nsr_config.scaling_group.add() - scaling_group.scaling_group_name_ref = msg.scaling_group_name_ref - - return (nsr_config, scaling_group) - - @asyncio.coroutine - def update_config(nsr_config): - xml = self._ncclient.convert_to_xml(RwNsrYang, nsr_config) - xml = '{}'.format(xml) - yield from self._ncclient.connect() - yield from self._ncclient.manager.edit_config(target="running", config=xml, default_operation="replace") - - @asyncio.coroutine + def get_scaling_group_information(): + scaling_group_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + output = requests.get(scaling_group_url, headers=self._headers, auth=(self._user, self._password), verify=False) + if output.text == None or len(output.text) == 0: + self.log.error("nsr id %s information not present", self._nsr_id) + return None + scaling_group_info = json.loads(output.text) + return scaling_group_info + + def config_scaling_group_information(scaling_group_info): + data_str = json.dumps(scaling_group_info) + self.log.debug("scaling group Info %s", data_str) + + scale_out_url = "{url}/ns-instance-config/nsr/{nsr_id}".format(url=self._conf_url, nsr_id=msg.nsr_id_ref) + response = requests.put(scale_out_url, data=data_str, verify=False, auth=(self._user, self._password), headers=self._headers) + response.raise_for_status() + def scale_out(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.append(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_present = False + if "scaling-group" in scaling_group_info["nsr:nsr"]: + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' not in scaling_group: + scaling_group['instance'] = [] + for instance in scaling_group['instance']: + if instance["id"] == int(msg.instance_id): + self.log.error("scaling group with instance id %s exists for scale out", msg.instance_id) + return + scaling_group["instance"].append({"id": int(msg.instance_id)}) + + if not scaling_group_present: + scaling_group_info["nsr:nsr"]["scaling-group"] = [{"scaling-group-name-ref": msg.scaling_group_name_ref, "instance": [{"id": msg.instance_id}]}] + + config_scaling_group_information(scaling_group_info) + return - @asyncio.coroutine def scale_in(): - nsr_config, scaling_group = yield from get_nsr_scaling_group() - scaling_group.instance.remove(instance) - yield from update_config(nsr_config) + scaling_group_info = get_scaling_group_information() + if scaling_group_info is None: + return + + scaling_group_array = scaling_group_info["nsr:nsr"]["scaling-group"] + scaling_group_present = False + instance_id_present = False + for scaling_group in scaling_group_array: + if scaling_group["scaling-group-name-ref"] == msg.scaling_group_name_ref: + scaling_group_present = True + if 'instance' in scaling_group: + instance_array = scaling_group["instance"]; + for index in range(len(instance_array)): + if instance_array[index]["id"] == int(msg.instance_id): + instance_array.pop(index) + instance_id_present = True + break + + if not scaling_group_present: + self.log.error("Scaling group %s doesnot exists for scale in", msg.scaling_group_name_ref) + return + + if not instance_id_present: + self.log.error("Instance id %s doesnot exists for scale in", msg.instance_id) + return + + config_scaling_group_information(scaling_group_info) + return if action == ScalingRpcHandler.ACTION.SCALE_OUT: - self._loop.create_task(scale_out()) + self._loop.run_in_executor(None, scale_out) else: - self._loop.create_task(scale_in()) - - # Opdata based calls, disabled for now! - # if action == ScalingRpcHandler.ACTION.SCALE_OUT: - # self.scale_nsr_out( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id, - # xact) - # else: - # self.scale_nsr_in( - # msg.nsr_id_ref, - # msg.scaling_group_name_ref, - # msg.instance_id) + self._loop.run_in_executor(None, scale_in) def nsr_update_cfg(self, nsr_id, msg): nsr = self._nsrs[nsr_id] @@ -4215,9 +4236,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): assert action == rwdts.QueryAction.RPC try: - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleIn.from_dict({ "instance_id": msg.instance_id}) @@ -4226,6 +4244,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_IN_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_IN) except Exception as e: self.log.exception(e) xact_info.respond_xpath( @@ -4243,9 +4263,6 @@ class ScalingRpcHandler(mano_dts.DtsHandler): msg.instance_id = last_instance_id + 1 self.last_instance_id[scale_group] += 1 - if self.callback: - self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) - rpc_op = NsrYang.YangOutput_Nsr_ExecScaleOut.from_dict({ "instance_id": msg.instance_id}) @@ -4254,6 +4271,8 @@ class ScalingRpcHandler(mano_dts.DtsHandler): self.__class__.SCALE_OUT_OUTPUT_XPATH, rpc_op) + if self.callback: + self.callback(xact_info.xact, msg, self.ACTION.SCALE_OUT) except Exception as e: self.log.exception(e) xact_info.respond_xpath( -- 2.25.1