nsr_id=nsr_id,
db_nsr=db_nsr,
db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
- async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
+ async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
# Launch kdus if present in the descriptor
deployed_ok = True
k8sclustertype = None
error_text = None
cluster_uuid = None
+ vnfd_id = vnfr_data.get('vnfd-id')
+ pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8sclustertype = "chart"
else:
error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
" running"
+ # check if kdumodel is a file and exists
+ try:
+ # path format: /vnfdid/pkkdir/kdumodel
+ filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except Exception:
+ # it is not a file
+ pass
try:
if not error_text:
cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
db_nsr = self.db.get_list("nsrs")[1]
db_vnfr = self.db.get_list("vnfrs")[2]
db_vnfrs = {"multikdu": db_vnfr}
+ db_vnfd = self.db.get_list("vnfds")[1]
+ db_vnfds = {db_vnfd["_id"]: db_vnfd}
nsr_id = db_nsr["_id"]
# nslcmop_id = self.db.get_list("nslcmops")[1]["_id"]
logging_text = "KDU"
self.my_ns.k8sclusterhelm.install = asynctest.CoroutineMock(return_value="k8s_id")
- await self.my_ns.deploy_kdus(logging_text, nsr_id, db_nsr, db_vnfrs)
+ await self.my_ns.deploy_kdus(logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds)
db_nsr = self.db.get_list("nsrs")[1]
self.assertIn("K8s", db_nsr["_admin"]["deployed"], "K8s entry not created at '_admin.deployed'")
self.assertIsInstance(db_nsr["_admin"]["deployed"]["K8s"], list, "K8s entry is not of type list")