+ deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
+ if db_nsr.get("additionalParamsForNs"):
+ deploy_params.update(
+ parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
+ )
+ base_folder = nsd["_admin"]["storage"]
+ self._deploy_n2vc(
+ logging_text=logging_text,
+ db_nsr=db_nsr,
+ db_vnfr=db_vnfr,
+ nslcmop_id=nslcmop_id,
+ nsr_id=nsr_id,
+ nsi_id=nsi_id,
+ vnfd_id=vnfd_id,
+ vdu_id=vdu_id,
+ kdu_name=kdu_name,
+ member_vnf_index=member_vnf_index,
+ vdu_index=vdu_index,
+ vdu_name=vdu_name,
+ deploy_params=deploy_params,
+ descriptor_config=descriptor_config,
+ base_folder=base_folder,
+ task_instantiation_info=tasks_dict_info,
+ stage=stage,
+ )
+
+ # rest of staff will be done at finally
+
+ except (
+ ROclient.ROClientException,
+ DbException,
+ LcmException,
+ N2VCException,
+ ) as e:
+ self.logger.error(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
+ )
+ exc = e
+ except asyncio.CancelledError:
+ self.logger.error(
+ logging_text + "Cancelled Exception while '{}'".format(stage[1])
+ )
+ exc = "Operation was cancelled"
+ except Exception as e:
+ exc = traceback.format_exc()
+ self.logger.critical(
+ logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
+ exc_info=True,
+ )
+ finally:
+ if exc:
+ error_list.append(str(exc))
+ try:
+ # wait for pending tasks
+ if tasks_dict_info:
+ stage[1] = "Waiting for instantiate pending tasks."
+ self.logger.debug(logging_text + stage[1])
+ error_list += await self._wait_for_tasks(
+ logging_text,
+ tasks_dict_info,
+ timeout_ns_deploy,
+ stage,
+ nslcmop_id,
+ nsr_id=nsr_id,
+ )
+ stage[1] = stage[2] = ""
+ except asyncio.CancelledError:
+ error_list.append("Cancelled")
+ # TODO cancel all tasks
+ except Exception as exc:
+ error_list.append(str(exc))
+
+ # update operation-status
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+ for task, task_name in tasks_dict_info.items():
+ if not task.done() or task.cancelled() or task.exception():
+ if task_name.startswith(self.task_name_deploy_vca):
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ else:
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+
+ # update status at database
+ if error_list:
+ error_detail = ". ".join(error_list)
+ self.logger.error(logging_text + error_detail)
+ error_description_nslcmop = "{} Detail: {}".format(
+ stage[0], error_detail
+ )
+ error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
+ nslcmop_id, stage[0]
+ )
+
+ db_nsr_update["detailed-status"] = (
+ error_description_nsr + " Detail: " + error_detail
+ )
+ db_nslcmop_update["detailed-status"] = error_detail
+ nslcmop_operation_state = "FAILED"
+ ns_state = "BROKEN"
+ else:
+ error_detail = None
+ error_description_nsr = error_description_nslcmop = None
+ ns_state = "READY"
+ db_nsr_update["detailed-status"] = "Done"
+ db_nslcmop_update["detailed-status"] = "Done"
+ nslcmop_operation_state = "COMPLETED"
+
+ if db_nsr:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description_nsr,
+ error_detail=error_detail,
+ other_update=db_nsr_update,
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage="",
+ error_message=error_description_nslcmop,
+ operation_state=nslcmop_operation_state,
+ other_update=db_nslcmop_update,
+ )
+
+ if nslcmop_operation_state:
+ try:
+ await self.msg.aiowrite(
+ "ns",
+ "instantiated",
+ {
+ "nsr_id": nsr_id,
+ "nslcmop_id": nslcmop_id,
+ "operationState": nslcmop_operation_state,
+ },
+ loop=self.loop,
+ )
+ except Exception as e:
+ self.logger.error(
+ logging_text + "kafka_write notification Exception {}".format(e)
+ )
+
+ self.logger.debug(logging_text + "Exit")
+ self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
+
+ async def _add_vca_relations(
+ self,
+ logging_text,
+ nsr_id,
+ vca_index: int,
+ timeout: int = 3600,
+ vca_type: str = None,
+ vca_id: str = None,
+ ) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+ vca_type = vca_type or "lxc_proxy_charm"
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("member-vnf-index") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get("vnfd-id")
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnf_relations = None
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
+ if db_vnf_configuration:
+ db_vnf_relations = db_vnf_configuration.get("relation", [])
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get("vdu_id") in (
+ r.get("entities")[0].get("id"),
+ r.get("entities")[1].get("id"),
+ ):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + " No relations")
+ return True
+
+ self.logger.debug(
+ logging_text
+ + " adding relations\n {}\n {}".format(
+ ns_relations, vnf_relations
+ )
+ )
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + " : timeout adding relations")
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ if vca.get("member-vnf-index") == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get("member-vnf-index") == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 0
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get("member-vnf-index") == r.get("entities")[
+ 1
+ ].get("id"):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations.copy():
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
+ for vca in vca_list:
+ key_to_check = "vdu_id"
+ if vca.get("vdu_id") is None:
+ key_to_check = "vnfd_id"
+ if vca.get(key_to_check) == r.get("entities")[0].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ from_vca_ee_id = vca.get("ee_id")
+ from_vca_endpoint = r.get("entities")[0].get("endpoint")
+ if vca.get(key_to_check) == r.get("entities")[1].get(
+ "id"
+ ) and vca.get("config_sw_installed"):
+ to_vca_ee_id = vca.get("ee_id")
+ to_vca_endpoint = r.get("entities")[1].get("endpoint")
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.vca_map[vca_type].add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint,
+ vca_id=vca_id,
+ )
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get("configurationStatus")
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get("vdu_id") == r.get("entities")[0].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ if vca.get("vdu_id") == r.get("entities")[1].get(
+ "id"
+ ):
+ if vca_status.get("status") == "BROKEN":
+ # peer broken: remove relation from list
+ vnf_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug("Relations added")
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
+ return False