+ async def ns_scale(self, nsr_id, nslcmop_id):
+ logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+ # get all needed from database
+ db_nsr = None
+ db_nslcmop = None
+ db_nslcmop_update = {}
+ db_nsr_update = {}
+ exc = None
+ try:
+ step = "Getting nslcmop from database"
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ step = "Getting nsr from database"
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ step = "Parsing scaling parameters"
+ nsr_lcm = db_nsr["_admin"].get("deployed")
+ RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
+ vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
+ scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
+ scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
+ scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
+
+ step = "Getting vnfr from database"
+ db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
+ step = "Getting vnfd from database"
+ db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
+ step = "Getting scaling-group-descriptor"
+ for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
+ if scaling_descriptor["name"] == scaling_group:
+ break
+ else:
+ raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
+ "at vnfd:scaling-group-descriptor".format(scaling_group))
+ cooldown_time = 0
+ for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
+ cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
+ if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
+ break
+
+ # TODO check if ns is in a proper status
+ step = "Sending scale order to RO"
+ nb_scale_op = 0
+ if not db_nsr["_admin"].get("scaling-group"):
+ self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
+ admin_scale_index = 0
+ else:
+ for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
+ if admin_scale_info["name"] == scaling_group:
+ nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
+ break
+ RO_scaling_info = []
+ vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
+ if scaling_type == "SCALE_OUT":
+ # count if max-instance-count is reached
+ if "max-instance-count" in scaling_descriptor and scaling_descriptor["max-instance-count"] is not None:
+ max_instance_count = int(scaling_descriptor["max-instance-count"])
+ if nb_scale_op >= max_instance_count:
+ raise LcmException("reached the limit of {} (max-instance-count) scaling-out operations for the"
+ " scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
+ nb_scale_op = nb_scale_op + 1
+ vdu_scaling_info["scaling_direction"] = "OUT"
+ vdu_scaling_info["vdu-create"] = {}
+ for vdu_scale_info in scaling_descriptor["vdu"]:
+ RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
+ "type": "create", "count": vdu_scale_info.get("count", 1)})
+ vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
+ elif scaling_type == "SCALE_IN":
+ # count if min-instance-count is reached
+ if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
+ min_instance_count = int(scaling_descriptor["min-instance-count"])
+ if nb_scale_op <= min_instance_count:
+ raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
+ "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
+ nb_scale_op = nb_scale_op - 1
+ vdu_scaling_info["scaling_direction"] = "IN"
+ vdu_scaling_info["vdu-delete"] = {}
+ for vdu_scale_info in scaling_descriptor["vdu"]:
+ RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
+ "type": "delete", "count": vdu_scale_info.get("count", 1)})
+ vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
+
+ # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
+ if vdu_scaling_info["scaling_direction"] == "IN":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_scaling_info["vdu-delete"].get(vdur["vdu-id-ref"]):
+ vdu_scaling_info["vdu-delete"][vdur["vdu-id-ref"]] -= 1
+ vdu_scaling_info["vdu"].append({
+ "name": vdur["name"],
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": []
+ })
+ for interface in vdur["interfaces"]:
+ vdu_scaling_info["vdu"][-1]["interface"].append({
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ del vdu_scaling_info["vdu-delete"]
+
+ # execute primitive service PRE-SCALING
+ step = "Executing pre-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor["scaling-config-action"]:
+ if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "pre-scale-in" \
+ and scaling_type == "SCALE_IN":
+ vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
+ step = db_nslcmop_update["detailed-status"] = \
+ "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
+ # look for primitive
+ primitive_params = {}
+ for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ for parameter in config_primitive.get("parameter", ()):
+ if 'default-value' in parameter and \
+ parameter['default-value'] == "<VDU_SCALE_INFO>":
+ primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
+ default_flow_style=True,
+ width=256)
+ break
+ else:
+ raise LcmException(
+ "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
+ "[vnf-config-primitive-name-ref='{}'] does not match any vnf-cnfiguration:config-"
+ "primitive".format(scaling_group, config_primitive))
+ result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
+ vnf_config_primitive, primitive_params)
+ self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail))
+ if result == "FAILED":
+ raise LcmException(result_detail)
+
+ if RO_scaling_info:
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO_desc = await RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
+ db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
+ db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
+ # TODO mark db_nsr_update as scaling
+ # wait until ready
+ RO_nslcmop_id = RO_desc["instance_action_id"]
+ db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
+
+ RO_task_done = False
+ step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
+ detailed_status_old = None
+ self.logger.debug(logging_text + step)
+
+ deployment_timeout = 1 * 3600 # One hours
+ while deployment_timeout > 0:
+ if not RO_task_done:
+ desc = await RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
+ extra_item_id=RO_nslcmop_id)
+ ns_status, ns_status_info = RO.check_action_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ detailed_status = step + "; {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ RO_task_done = True
+ step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
+ self.logger.debug(logging_text + step)
+ else:
+ assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
+ else:
+ desc = await RO.show("ns", RO_nsr_id)
+ ns_status, ns_status_info = RO.check_ns_status(desc)
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ detailed_status = step + "; {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ step = detailed_status = "Waiting for management IP address reported by the VIM"
+ try:
+ desc = await RO.show("ns", RO_nsr_id)
+ nsr_lcm["nsr_ip"] = RO.get_ns_vnf_info(desc)
+ break
+ except ROclient.ROClientException as e:
+ if e.http_code != 409: # IP address is not ready return code is 409 CONFLICT
+ raise e
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
+ if detailed_status != detailed_status_old:
+ detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+
+ await asyncio.sleep(5, loop=self.loop)
+ deployment_timeout -= 5
+ if deployment_timeout <= 0:
+ raise ROclient.ROClientException("Timeout waiting ns to be ready")
+
+ step = "Updating VNFRs"
+ # self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, ns_RO_info)
+ self.ns_update_vnfr_2({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
+
+ # update VDU_SCALING_INFO with the obtained ip_addresses
+ if vdu_scaling_info["scaling_direction"] == "OUT":
+ for vdur in reversed(db_vnfr["vdur"]):
+ if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
+ vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
+ vdu_scaling_info["vdu"].append({
+ "name": vdur["name"],
+ "vdu_id": vdur["vdu-id-ref"],
+ "interface": []
+ })
+ for interface in vdur["interfaces"]:
+ vdu_scaling_info["vdu"][-1]["interface"].append({
+ "name": interface["name"],
+ "ip_address": interface["ip-address"],
+ "mac_address": interface.get("mac-address"),
+ })
+ del vdu_scaling_info["vdu-create"]
+
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # execute primitive service POST-SCALING
+ step = "Executing post-scale vnf-config-primitive"
+ if scaling_descriptor.get("scaling-config-action"):
+ for scaling_config_action in scaling_descriptor["scaling-config-action"]:
+ if scaling_config_action.get("trigger") and scaling_config_action["trigger"] == "post-scale-out" \
+ and scaling_type == "SCALE_OUT":
+ vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
+ step = db_nslcmop_update["detailed-status"] = \
+ "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
+ # look for primitive
+ primitive_params = {}
+ for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
+ if config_primitive["name"] == vnf_config_primitive:
+ for parameter in config_primitive.get("parameter", ()):
+ if 'default-value' in parameter and \
+ parameter['default-value'] == "<VDU_SCALE_INFO>":
+ primitive_params[parameter["name"]] = yaml.safe_dump(vdu_scaling_info,
+ default_flow_style=True,
+ width=256)
+ break
+ else:
+ raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
+ "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
+ "match any vnf-cnfiguration:config-primitive".format(scaling_group,
+ config_primitive))
+ result, result_detail = await self._ns_execute_primitive(nsr_lcm, vnf_index,
+ vnf_config_primitive, primitive_params)
+ self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
+ vnf_config_primitive, result, result_detail))
+ if result == "FAILED":
+ raise LcmException(result_detail)
+
+ db_nslcmop_update["operationState"] = "COMPLETED"
+ db_nslcmop_update["statusEnteredTime"] = time()
+ db_nslcmop_update["detailed-status"] = "done"
+ db_nsr_update["detailed-status"] = "done"
+ try:
+ await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ if cooldown_time:
+ await asyncio.sleep(cooldown_time)
+ await self.msg.aiowrite("ns", "scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
+ except Exception as e:
+ self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
+ self.logger.debug(logging_text + "Exit Ok")
+ return
+ except (ROclient.ROClientException, DbException, LcmException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
+ finally:
+ if exc:
+ db_nsr_update = None
+ if db_nslcmop:
+ db_nslcmop_update = {
+ "detailed-status": "FAILED {}: {}".format(step, exc),
+ "operationState": "FAILED",
+ "statusEnteredTime": time(),
+ }
+ if db_nslcmop_update:
+ self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+ if db_nsr_update:
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+