vnfr_id=vnfr_id,
nsr_id=nsr_id,
target_ip=rw_mgmt_ip,
+ element_type=element_type,
vnf_member_index=db_vnfr.get("member-vnf-index-ref", ""),
vdu_id=vdu_id,
vdu_index=vdu_index,
cached_vnfds: Dict[str, Any],
) -> List[Relation]:
relations = []
+ if vca.target_element == "ns":
+ self.logger.debug("VCA is a NS charm, not a VNF.")
+ return relations
vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
vnf_profile_id = vnf_profile["id"]
vnfd_id = vnf_profile["vnfd-id"]
vnfr_id: str,
nsr_id: str,
target_ip: str,
+ element_type: str,
vnf_member_index: str = "",
vdu_id: str = "",
vdu_index: int = None,
vnfr_id (str): VNFR ID where this EE applies
nsr_id (str): NSR ID where this EE applies
target_ip (str): VDU/KDU instance IP address
+ element_type (str): NS or VNF or VDU or KDU
vnf_member_index (str, optional): VNF index where this EE applies. Defaults to "".
vdu_id (str, optional): VDU ID where this EE applies. Defaults to "".
vdu_index (int, optional): VDU index where this EE applies. Defaults to None.
Returns:
_type_: Prometheus jobs
"""
- self.logger.debug(f"KDU: {kdu_name}; KDU INDEX: {kdu_index}")
+ # default the vdur and kdur names to an empty string, to avoid any later
+ # problem with Prometheus when the element type is not VDU or KDU
+ vdur_name = ""
+ kdur_name = ""
+
# look if exist a file called 'prometheus*.j2' and
artifact_content = self.fs.dir_ls(artifact_path)
job_file = next(
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
- vdur_name = ""
- kdur_name = ""
- for r in range(360):
- db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
- if vdu_id and vdu_index is not None:
- vdur = next(
- (
- x
- for x in get_iterable(db_vnfr, "vdur")
- if (
- x.get("vdu-id-ref") == vdu_id
- and x.get("count-index") == vdu_index
- )
- ),
- {},
- )
- if vdur.get("name"):
- vdur_name = vdur.get("name")
- break
- if kdu_name and kdu_index is not None:
- kdur = next(
- (
- x
- for x in get_iterable(db_vnfr, "kdur")
- if (
- x.get("kdu-name") == kdu_name
- and x.get("count-index") == kdu_index
- )
- ),
- {},
- )
- if kdur.get("name"):
- kdur_name = kdur.get("name")
- break
+ # obtain the VDUR or KDUR, if the element type is VDU or KDU
+ if element_type in ("VDU", "KDU"):
+ for _ in range(360):
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ if vdu_id and vdu_index is not None:
+ vdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "vdur")
+ if (
+ x.get("vdu-id-ref") == vdu_id
+ and x.get("count-index") == vdu_index
+ )
+ ),
+ {},
+ )
+ if vdur.get("name"):
+ vdur_name = vdur.get("name")
+ break
+ if kdu_name and kdu_index is not None:
+ kdur = next(
+ (
+ x
+ for x in get_iterable(db_vnfr, "kdur")
+ if (
+ x.get("kdu-name") == kdu_name
+ and x.get("count-index") == kdu_index
+ )
+ ),
+ {},
+ )
+ if kdur.get("name"):
+ kdur_name = kdur.get("name")
+ break
- await asyncio.sleep(10, loop=self.loop)
- else:
- if vdu_id and vdu_index is not None:
- raise LcmException(
- f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
- )
- if kdu_name and kdu_index is not None:
- raise LcmException(
- f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
- )
+ await asyncio.sleep(10, loop=self.loop)
+ else:
+ if vdu_id and vdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting VDU with name={vdu_id} and index={vdu_index} to be intantiated"
+ )
+ if kdu_name and kdu_index is not None:
+ raise LcmException(
+ f"Timeout waiting KDU with name={kdu_name} and index={kdu_index} to be intantiated"
+ )
# TODO get_service
_, _, service = ee_id.partition(".") # remove prefix "namespace."
"VNF_MEMBER_INDEX": vnf_member_index,
"VDUR_NAME": vdur_name,
"KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
}
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id