X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Fns.py;h=a04f0a85207dc020863bc3506a01c5c573ada00c;hb=d124bfe3d2ac63a95fd2af44db3184985ab74d75;hp=b9e70d34902a643c232640bc25a9456296f0f64c;hpb=98ad6ea9d8962a61168bac87c0b1f52c7287750f;p=osm%2FLCM.git diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index b9e70d3..a04f0a8 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -628,6 +628,12 @@ class NsLcm(LcmBase): raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index)) async def instantiate(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -644,6 +650,9 @@ class NsLcm(LcmBase): n2vc_key_list = [] # list of public keys to be injected as authorized to VMs exc = None try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) @@ -652,17 +661,6 @@ class NsLcm(LcmBase): nsd = db_nsr["nsd"] nsr_name = db_nsr["name"] # TODO short-name?? - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - step = "Getting vnfrs from db" db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) db_vnfds_ref = {} @@ -722,7 +720,7 @@ class NsLcm(LcmBase): machine_spec = {} if native_charm: machine_spec["username"] = charm_params.get("username"), - machine_spec["username"] = charm_params.get("rw_mgmt_ip") + machine_spec["hostname"] = charm_params.get("rw_mgmt_ip") # Note: The charm needs to exist on disk at the location # specified by charm_path. @@ -845,7 +843,7 @@ class NsLcm(LcmBase): if vdu_config and vdu_config.get("juju"): proxy_charm = vdu_config["juju"]["charm"] - if vnf_config["juju"].get("proxy") is False: + if vdu_config["juju"].get("proxy") is False: # native_charm, will be deployed after VM. Skip proxy_charm = None @@ -879,7 +877,7 @@ class NsLcm(LcmBase): ns_config = nsd.get("ns-configuration") if ns_config and ns_config.get("juju"): proxy_charm = ns_config["juju"]["charm"] - if vnf_config["juju"].get("proxy") is False: + if ns_config["juju"].get("proxy") is False: # native_charm, will be deployed after VM. Skip proxy_charm = None @@ -963,7 +961,7 @@ class NsLcm(LcmBase): ssh_public_key n2vc_key_list.append(ssh_public_key) step = "charm ssh-public-key for member_vnf_index={} vdu_id={} is '{}'".format( - vca_deployed["member_vnf_index"], vca_deployed["vdu_id"], ssh_public_key) + vca_deployed["member-vnf-index"], vca_deployed["vdu_id"], ssh_public_key) self.logger.debug(logging_text + step) else: # primitive_status == "failed": if vca_deployed["step"] == "get-ssh-public-key": @@ -1208,8 +1206,8 @@ class NsLcm(LcmBase): for vdu_index, vdu in enumerate(get_iterable(vnfd, 'vdu')): if vdu["id"] == vdu_id: initial_config_primitive_list = vdu['vdu-configuration'].get( - 'initial-config-primitive', ()) - break + 'initial-config-primitive', []) + break else: raise LcmException("Not found vdu_id={} at vnfd:vdu".format(vdu_id)) vdur = db_vnfrs[vnf_index]["vdur"][vdu_index] @@ -1220,7 +1218,7 @@ class NsLcm(LcmBase): add_params["rw_mgmt_ip"] = vdur["ip-address"] else: add_params["rw_mgmt_ip"] = db_vnfrs[vnf_index]["ip-address"] - initial_config_primitive_list = vnfd["vnf-configuration"].get('initial-config-primitive', ()) + initial_config_primitive_list = vnfd["vnf-configuration"].get('initial-config-primitive', []) else: if db_nsr.get("additionalParamsForNs"): add_params = db_nsr["additionalParamsForNs"].copy() @@ -1228,19 +1226,25 @@ class NsLcm(LcmBase): if isinstance(v, str) and v.startswith("!!yaml "): add_params[k] = yaml.safe_load(v[7:]) add_params["rw_mgmt_ip"] = None + initial_config_primitive_list = nsd["ns-configuration"].get('initial-config-primitive', []) # add primitive verify-ssh-credentials to the list after config only when is a vnf or vdu charm initial_config_primitive_list = initial_config_primitive_list.copy() if initial_config_primitive_list and vnf_index: - initial_config_primitive_list.insert(1, {"name": "verify-ssh-credentials", "paramter": []}) + initial_config_primitive_list.insert(1, {"name": "verify-ssh-credentials", "parameter": []}) for initial_config_primitive in initial_config_primitive_list: + primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, add_params) + self.logger.debug(logging_text + step + " primitive '{}' params '{}'" + .format(initial_config_primitive["name"], primitive_params_)) primitive_result, primitive_detail = await self._ns_execute_primitive( db_nsr["_admin"]["deployed"], vnf_index, vdu_id, vdu_name, vdu_count_index, initial_config_primitive["name"], - self._map_primitive_params(initial_config_primitive, {}, add_params)) + primitive_params_, + retries=10 if initial_config_primitive["name"] == "verify-ssh-credentials" else 0, + retries_interval=30) if primitive_result != "COMPLETED": - raise LcmException("charm error executing primitive {} for member_vnf_index={} vdu_id={}: '{}'" + raise LcmException("charm error executing primitive {} for member_vnf_index={} vdu_id={}: '{}'" .format(initial_config_primitive["name"], vca_deployed["member-vnf-index"], vca_deployed["vdu_id"], primitive_detail)) @@ -1391,6 +1395,7 @@ class NsLcm(LcmBase): # waiting all charms are ok configuration_failed = False if number_to_configure: + step = "Waiting all charms are active" old_status = "configuring: init: {}".format(number_to_configure) db_nsr_update["config-status"] = old_status db_nsr_update["detailed-status"] = old_status @@ -1643,6 +1648,12 @@ class NsLcm(LcmBase): vnf_index, seq.get("name"), nslcmop_operation_state_detail)) async def terminate(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") db_nsr = None @@ -1655,6 +1666,9 @@ class NsLcm(LcmBase): nslcmop_operation_state = None autoremove = False # autoremove after terminated try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id) + step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr={} from db".format(nsr_id) @@ -1926,7 +1940,7 @@ class NsLcm(LcmBase): return calculated_params async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index, - primitive, primitive_params): + primitive, primitive_params, retries=0, retries_interval=30): start_primitive_time = time() try: for vca_deployed in db_deployed["VCA"]: @@ -1956,34 +1970,47 @@ class NsLcm(LcmBase): await self.n2vc.login() if primitive == "config": primitive_params = {"params": primitive_params} - primitive_id = await self.n2vc.ExecutePrimitive( - model_name, - application_name, - primitive, - callback, - *callback_args, - **primitive_params - ) - while time() - start_primitive_time < self.timeout_primitive: - primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id) - if primitive_result_ in ("completed", "failed"): - primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED" - detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id) - break - elif primitive_result_ is None and primitive == "config": - primitive_result = "COMPLETED" - detailed_result = None + while retries >= 0: + primitive_id = await self.n2vc.ExecutePrimitive( + model_name, + application_name, + primitive, + callback, + *callback_args, + **primitive_params + ) + while time() - start_primitive_time < self.timeout_primitive: + primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id) + if primitive_result_ in ("completed", "failed"): + primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED" + detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id) + break + elif primitive_result_ is None and primitive == "config": + primitive_result = "COMPLETED" + detailed_result = None + break + else: # ("running", "pending", None): + pass + await asyncio.sleep(5) + else: + raise LcmException("timeout after {} seconds".format(self.timeout_primitive)) + if primitive_result == "COMPLETED": break - else: # ("running", "pending", None): - pass - await asyncio.sleep(5) - else: - raise LcmException("timeout after {} seconds".format(self.timeout_primitive)) + retries -= 1 + if retries >= 0: + await asyncio.sleep(retries_interval) + return primitive_result, detailed_result except (N2VCPrimitiveExecutionFailed, LcmException) as e: return "FAILED", str(e) async def action(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -1995,6 +2022,9 @@ class NsLcm(LcmBase): nslcmop_operation_state_detail = None exc = None try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + step = "Getting information from database" db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) @@ -2017,17 +2047,6 @@ class NsLcm(LcmBase): step = "Getting nsd from database" db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]}) - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - # for backward compatibility if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict): nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) @@ -2115,6 +2134,12 @@ class NsLcm(LcmBase): return nslcmop_operation_state, nslcmop_operation_state_detail async def scale(self, nsr_id, nslcmop_id): + + # Try to lock HA task here + task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) + if not task_is_locked_by_me: + return + logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") # get all needed from database @@ -2130,26 +2155,19 @@ class NsLcm(LcmBase): old_config_status = "" vnfr_scaled = False try: + # wait for any previous tasks in process + await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) + step = "Getting nslcmop from database" + self.logger.debug(step + " after having waited for previous tasks to be completed") db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) step = "Getting nsr from database" db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) old_operational_status = db_nsr["operational-status"] old_config_status = db_nsr["config-status"] - - # look if previous tasks in process - task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id) - if task_dependency: - step = db_nslcmop_update["detailed-status"] = \ - "Waiting for related tasks to be completed: {}".format(task_name) - self.logger.debug(logging_text + step) - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - _, pending = await asyncio.wait(task_dependency, timeout=3600) - if pending: - raise LcmException("Timeout waiting related tasks to be completed") - step = "Parsing scaling parameters" + # self.logger.debug(step) db_nsr_update["operational-status"] = "scaling" self.update_db_2("nsrs", nsr_id, db_nsr_update) nsr_deployed = db_nsr["_admin"].get("deployed") @@ -2169,6 +2187,7 @@ class NsLcm(LcmBase): db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}) step = "Getting vnfd from database" db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]}) + step = "Getting scaling-group-descriptor" for scaling_descriptor in db_vnfd["scaling-group-descriptor"]: if scaling_descriptor["name"] == scaling_group: @@ -2176,6 +2195,7 @@ class NsLcm(LcmBase): else: raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present " "at vnfd:scaling-group-descriptor".format(scaling_group)) + # cooldown_time = 0 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()): # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0) @@ -2202,16 +2222,21 @@ class NsLcm(LcmBase): # count if max-instance-count is reached if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None: max_instance_count = int(scaling_descriptor["max-instance-count"]) + + # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(scaling_descriptor["max-instance-count"])) if nb_scale_op >= max_instance_count: - raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the" - " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)) - nb_scale_op = nb_scale_op + 1 + raise LcmException("reached the limit of {} (max-instance-count) " + "scaling-out operations for the " + "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)) + + nb_scale_op += 1 vdu_scaling_info["scaling_direction"] = "OUT" vdu_scaling_info["vdu-create"] = {} for vdu_scale_info in scaling_descriptor["vdu"]: RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index, "type": "create", "count": vdu_scale_info.get("count", 1)}) vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1) + elif scaling_type == "SCALE_IN": # count if min-instance-count is reached min_instance_count = 0 @@ -2220,7 +2245,7 @@ class NsLcm(LcmBase): if nb_scale_op <= min_instance_count: raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the " "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)) - nb_scale_op = nb_scale_op - 1 + nb_scale_op -= 1 vdu_scaling_info["scaling_direction"] = "IN" vdu_scaling_info["vdu-delete"] = {} for vdu_scale_info in scaling_descriptor["vdu"]: @@ -2408,7 +2433,8 @@ class NsLcm(LcmBase): db_nslcmop_update["statusEnteredTime"] = time() db_nslcmop_update["detailed-status"] = "done" db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type) - db_nsr_update["operational-status"] = old_operational_status + db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \ + else old_operational_status db_nsr_update["config-status"] = old_config_status return except (ROclient.ROClientException, DbException, LcmException) as e: