+ try:
+ db_path = "configurationStatus.{}.".format(vca_index)
+ db_dict = other_update or {}
+ if status:
+ db_dict[db_path + "status"] = status
+ if element_under_configuration:
+ db_dict[
+ db_path + "elementUnderConfiguration"
+ ] = element_under_configuration
+ if element_type:
+ db_dict[db_path + "elementType"] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except DbException as e:
+ self.logger.warn(
+ "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
+ status, nsr_id, vca_index, e
+ )
+ )
+
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
+ computed 'vim-account-id'
+ """
+ modified = False
+ nslcmop_id = db_nslcmop["_id"]
+ placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
+ if placement_engine == "PLA":
+ self.logger.debug(
+ logging_text + "Invoke and wait for placement optimization"
+ )
+ await self.msg.aiowrite(
+ "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
+ )
+ db_poll_interval = 5
+ wait = db_poll_interval * 10
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
+
+ if not pla_result:
+ raise LcmException(
+ "Placement timeout for nslcmopId={}".format(nslcmop_id)
+ )
+
+ for pla_vnf in pla_result["vnf"]:
+ vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
+ if not pla_vnf.get("vimAccountId") or not vnfr:
+ continue
+ modified = True
+ self.db.set_one(
+ "vnfrs",
+ {"_id": vnfr["_id"]},
+ {"vim-account-id": pla_vnf["vimAccountId"]},
+ )
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
+ return modified
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
+ self.update_db_2(
+ "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
+ )
+ except Exception as e:
+ self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
+
+ async def instantiate(self, nsr_id, nslcmop_id):
+ """
+
+ :param nsr_id: ns instance to deploy
+ :param nslcmop_id: operation to run
+ :return:
+ """
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
+ if not task_is_locked_by_me:
+ self.logger.debug(
+ "instantiate() task is not locked by me, ns={}".format(nsr_id)
+ )
+ return
+
+ logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
+ self.logger.debug(logging_text + "Enter")
+
+ # get all needed from database
+
+ # database nsrs record
+ db_nsr = None
+
+ # database nslcmops record
+ db_nslcmop = None
+
+ # update operation on nsrs
+ db_nsr_update = {}
+ # update operation on nslcmops
+ db_nslcmop_update = {}
+
+ nslcmop_operation_state = None
+ db_vnfrs = {} # vnf's info indexed by member-index
+ # n2vc_info = {}
+ tasks_dict_info = {} # from task to info text
+ exc = None
+ error_list = []
+ stage = [
+ "Stage 1/5: preparation of the environment.",
+ "Waiting for previous operations to terminate.",
+ "",
+ ]
+ # ^ stage, step, VIM progress
+ try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
+
+ # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
+ stage[1] = "Reading from database."
+ # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ db_nsr_update["detailed-status"] = "creating"
+ db_nsr_update["operational-status"] = "init"
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="BUILDING",
+ current_operation="INSTANTIATING",
+ current_operation_id=nslcmop_id,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
+
+ # read from db: operation
+ stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
+ db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
+ db_nslcmop["operationParams"]["additionalParamsForVnf"]
+ )
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get(
+ "ns_deploy", self.timeout_ns_deploy
+ )
+
+ # read from db: ns
+ stage[1] = "Getting nsr={} from db.".format(nsr_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+ self.fs.sync(db_nsr["nsd-id"])
+ db_nsr["nsd"] = nsd
+ # nsr_name = db_nsr["name"] # TODO short-name??
+
+ # read from db: vnf's of this ns
+ stage[1] = "Getting vnfrs from db."
+ self.logger.debug(logging_text + stage[1])
+ db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ # read from db: vnfd's for every vnf
+ db_vnfds = [] # every vnfd data
+
+ # for each vnf in ns, read vnfd
+ for vnfr in db_vnfrs_list:
+ if vnfr.get("kdur"):
+ kdur_list = []
+ for kdur in vnfr["kdur"]:
+ if kdur.get("additionalParams"):
+ kdur["additionalParams"] = json.loads(
+ kdur["additionalParams"]
+ )
+ kdur_list.append(kdur)
+ vnfr["kdur"] = kdur_list
+
+ db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
+ vnfd_id = vnfr["vnfd-id"]
+ vnfd_ref = vnfr["vnfd-ref"]
+ self.fs.sync(vnfd_id)
+
+ # if we haven't this vnfd, read it from db
+ if vnfd_id not in db_vnfds:
+ # read from db
+ stage[1] = "Getting vnfd={} id='{}' from db.".format(
+ vnfd_id, vnfd_ref
+ )
+ self.logger.debug(logging_text + stage[1])
+ vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
+
+ # store vnfd
+ db_vnfds.append(vnfd)
+
+ # Get or generates the _admin.deployed.VCA list
+ vca_deployed_list = None
+ if db_nsr["_admin"].get("deployed"):
+ vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
+ if vca_deployed_list is None:
+ vca_deployed_list = []
+ configuration_status_list = []
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ db_nsr_update["configurationStatus"] = configuration_status_list
+ # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+ elif isinstance(vca_deployed_list, dict):
+ # maintain backward compatibility. Change a dict to list at database
+ vca_deployed_list = list(vca_deployed_list.values())
+ db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
+
+ if not isinstance(
+ deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
+ ):
+ populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
+ db_nsr_update["_admin.deployed.RO.vnfd"] = []
+
+ # set state to INSTANTIATED. When instantiated NBI will not delete directly
+ db_nsr_update["_admin.nsState"] = "INSTANTIATED"
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self.db.set_list(
+ "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
+ )
+
+ # n2vc_redesign STEP 2 Deploy Network Scenario
+ stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
+ self._write_op_status(op_id=nslcmop_id, stage=stage)
+
+ stage[1] = "Deploying KDUs."
+ # self.logger.debug(logging_text + "Before deploy_kdus")
+ # Call to deploy_kdus in case exists the "vdu:kdu" param
+ await self.deploy_kdus(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nslcmop_id=nslcmop_id,
+ db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds,
+ task_instantiation_info=tasks_dict_info,
+ )
+
+ stage[1] = "Getting VCA public key."
+ # n2vc_redesign STEP 1 Get VCA public ssh-key
+ # feature 1429. Add n2vc public key to needed VMs
+ n2vc_key = self.n2vc.get_public_key()
+ n2vc_key_list = [n2vc_key]
+ if self.vca_config.get("public_key"):
+ n2vc_key_list.append(self.vca_config["public_key"])
+
+ stage[1] = "Deploying NS at VIM."
+ task_ro = asyncio.ensure_future(
+ self.instantiate_RO(
+ logging_text=logging_text,
+ nsr_id=nsr_id,
+ nsd=nsd,
+ db_nsr=db_nsr,
+ db_nslcmop=db_nslcmop,
+ db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds,
+ n2vc_key_list=n2vc_key_list,
+ stage=stage,
+ )
+ )
+ self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
+ tasks_dict_info[task_ro] = "Deploying at VIM"
+
+ # n2vc_redesign STEP 3 to 6 Deploy N2VC
+ stage[1] = "Deploying Execution Environments."
+ self.logger.debug(logging_text + stage[1])
+
+ nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
+ for vnf_profile in get_vnf_profiles(nsd):
+ vnfd_id = vnf_profile["vnfd-id"]
+ vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
+ member_vnf_index = str(vnf_profile["id"])
+ db_vnfr = db_vnfrs[member_vnf_index]
+ base_folder = vnfd["_admin"]["storage"]
+ vdu_id = None
+ vdu_index = 0