from time import time
from uuid import uuid4
-from random import randint
+from random import SystemRandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
SUBOPERATION_STATUS_NOT_FOUND = -1
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
+ EE_TLS_NAME = "ee-tls"
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, msg, lcm_tasks, config: LcmCfg, loop):
+ def __init__(self, msg, lcm_tasks, config: LcmCfg):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.db = Database().instance.db
self.fs = Filesystem().instance.fs
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.timeout = config.timeout
self.ro_config = config.RO
# create N2VC connector
self.n2vc = N2VCJujuConnector(
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_n2vc_db,
fs=self.fs,
db=self.db,
self.conn_helm_ee = LCMHelmConn(
log=self.logger,
- loop=self.loop,
vca_config=self.vca_config,
on_update_db=self._on_update_n2vc_db,
)
kubectl_command=self.vca_config.kubectlpath,
juju_command=self.vca_config.jujupath,
log=self.logger,
- loop=self.loop,
on_update_db=self._on_update_k8s_db,
fs=self.fs,
db=self.db,
}
# create RO client
- self.RO = NgRoClient(self.loop, **self.ro_config.to_dict())
+ self.RO = NgRoClient(**self.ro_config.to_dict())
self.op_status_map = {
"instantiation": self.RO.status,
image["vim_info"] = {}
for flavor in target["flavor"]:
flavor["vim_info"] = {}
+ if db_nsr.get("shared-volumes"):
+ target["shared-volumes"] = deepcopy(db_nsr["shared-volumes"])
+ for shared_volumes in target["shared-volumes"]:
+ shared_volumes["vim_info"] = {}
if db_nsr.get("affinity-or-anti-affinity-group"):
target["affinity-or-anti-affinity-group"] = deepcopy(
db_nsr["affinity-or-anti-affinity-group"]
if target_vim not in ns_ags["vim_info"]:
ns_ags["vim_info"][target_vim] = {}
+ # shared-volumes
+ if vdur.get("shared-volumes-id"):
+ for sv_id in vdur["shared-volumes-id"]:
+ ns_sv = find_in_list(
+ target["shared-volumes"], lambda sv: sv_id in sv["id"]
+ )
+ if ns_sv:
+ ns_sv["vim_info"][target_vim] = {}
+
vdur["vim_info"] = {target_vim: {}}
# instantiation parameters
if vnf_params:
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
self._write_op_status(nslcmop_id, stage)
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"target KDU={} is in error state".format(kdu_name)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
nb_tries += 1
raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
"Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
)
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
# get ip address
if not target_vdu_id:
ee_id, credentials = await self.vca_map[
vca_type
].create_execution_environment(
- namespace=namespace,
+ namespace=nsr_id,
reuse_ee_id=ee_id,
db_dict=db_dict,
config=osm_config,
self.logger.debug(
logging_text + "Invoke and wait for placement optimization"
)
- await self.msg.aiowrite(
- "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
- )
+ await self.msg.aiowrite("pla", "get_placement", {"nslcmopId": nslcmop_id})
db_poll_interval = 5
wait = db_poll_interval * 10
pla_result = None
# create namespace and certificate if any helm based EE is present in the NS
if check_helm_ee_in_ns(db_vnfds):
- # TODO: create EE namespace
+ await self.vca_map["helm-v3"].setup_ns_namespace(
+ name=nsr_id,
+ )
# create TLS certificates
await self.vca_map["helm-v3"].create_tls_certificate(
- secret_name="ee-tls-{}".format(nsr_id),
+ secret_name=self.EE_TLS_NAME,
dns_prefix="*",
nsr_id=nsr_id,
usage="server auth",
+ namespace=nsr_id,
)
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
stage=stage,
)
+ # Check if each vnf has exporter for metric collection if so update prometheus job records
+ if "exporters-endpoints" in vnfd.get("df")[0]:
+ exporter_config = vnfd.get("df")[0].get("exporters-endpoints")
+ self.logger.debug("exporter config :{}".format(exporter_config))
+ artifact_path = "{}/{}/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ "exporter-endpoint",
+ )
+ ee_id = None
+ ee_config_descriptor = exporter_config
+ vnfr_id = db_vnfr["id"]
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
+ logging_text,
+ nsr_id,
+ vnfr_id,
+ vdu_id=None,
+ vdu_index=None,
+ user=None,
+ pub_key=None,
+ )
+ self.logger.debug("rw_mgmt_ip:{}".format(rw_mgmt_ip))
+ self.logger.debug("Artifact_path:{}".format(artifact_path))
+ db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
+ vdu_id_for_prom = None
+ vdu_index_for_prom = None
+ for x in get_iterable(db_vnfr, "vdur"):
+ vdu_id_for_prom = x.get("vdu-id-ref")
+ vdu_index_for_prom = x.get("count-index")
+ prometheus_jobs = await self.extract_prometheus_scrape_jobs(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ element_type="VDU",
+ vdu_id=vdu_id_for_prom,
+ vdu_index=vdu_index_for_prom,
+ )
+
+ self.logger.debug("Prometheus job:{}".format(prometheus_jobs))
+ if prometheus_jobs:
+ db_nsr_update["_admin.deployed.prometheus_jobs"] = prometheus_jobs
+ self.update_db_2(
+ "nsrs",
+ nsr_id,
+ db_nsr_update,
+ )
+
+ for job in prometheus_jobs:
+ self.db.set_one(
+ "prometheus_jobs",
+ {"job_name": job["job_name"]},
+ job,
+ upsert=True,
+ fail_on_empty=False,
+ )
+
# Check if this NS has a charm configuration
descriptor_config = nsd.get("ns-configuration")
if descriptor_config and descriptor_config.get("juju"):
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
# TODO vdu_index_count
for vca in vca_deployed_list:
if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
- return vca["ee_id"]
+ return vca.get("ee_id")
async def destroy_N2VC(
self,
# Delete Namespace and Certificates if necessary
if check_helm_ee_in_ns(list(db_vnfds_from_member_index.values())):
await self.vca_map["helm-v3"].delete_tls_certificate(
- certificate_name=db_nslcmop["nsInstanceId"],
+ namespace=db_nslcmop["nsInstanceId"],
+ certificate_name=self.EE_TLS_NAME,
+ )
+ await self.vca_map["helm-v3"].delete_namespace(
+ namespace=db_nslcmop["nsInstanceId"],
)
- # TODO: Delete namespace
# Delete from k8scluster
stage[1] = "Deleting KDUs."
"operationState": nslcmop_operation_state,
"autoremove": autoremove,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
)
)
# wait and retry
- await asyncio.sleep(retries_interval, loop=self.loop)
+ await asyncio.sleep(retries_interval)
else:
if isinstance(e, asyncio.TimeoutError):
e = N2VCException(
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
},
- loop=self.loop,
)
except Exception as e:
self.logger.error(
and member_vnf_index
):
msg.update({"vnf_member_index": member_vnf_index})
- await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
+ await self.msg.aiowrite("ns", change_type, msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "scaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
)
if not job_file:
return
+ self.logger.debug("Artifact path{}".format(artifact_path))
+ self.logger.debug("job file{}".format(job_file))
with self.fs.file_open((artifact_path, job_file), "r") as f:
job_data = f.read()
kdur_name = kdur.get("name")
break
- await asyncio.sleep(10, loop=self.loop)
+ await asyncio.sleep(10)
else:
if vdu_id and vdu_index is not None:
raise LcmException(
)
# TODO get_service
- _, _, service = ee_id.partition(".") # remove prefix "namespace."
- host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
- host_port = "80"
- vnfr_id = vnfr_id.replace("-", "")
- variables = {
- "JOB_NAME": vnfr_id,
- "TARGET_IP": target_ip,
- "EXPORTER_POD_IP": host_name,
- "EXPORTER_POD_PORT": host_port,
- "NSR_ID": nsr_id,
- "VNF_MEMBER_INDEX": vnf_member_index,
- "VDUR_NAME": vdur_name,
- "KDUR_NAME": kdur_name,
- "ELEMENT_TYPE": element_type,
- }
+ if ee_id is not None:
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ "NSR_ID": nsr_id,
+ "VNF_MEMBER_INDEX": vnf_member_index,
+ "VDUR_NAME": vdur_name,
+ "KDUR_NAME": kdur_name,
+ "ELEMENT_TYPE": element_type,
+ }
+ else:
+ metric_path = ee_config_descriptor["metric-path"]
+ target_port = ee_config_descriptor["metric-port"]
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "TARGET_PORT": target_port,
+ "METRIC_PATH": metric_path,
+ }
+
job_list = parse_job(job_data, variables)
# ensure job_name is using the vnfr_id. Adding the metadata nsr_id
for job in job_list:
not isinstance(job.get("job_name"), str)
or vnfr_id not in job["job_name"]
):
- job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["job_name"] = vnfr_id + "_" + str(SystemRandom().randint(1, 10000))
job["nsr_id"] = nsr_id
job["vnfr_id"] = vnfr_id
return job_list
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "migrated", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
for target_vdu in target_vdu_list:
deploy_params_vdu = target_vdu
# Set run-day1 vnf level value if not vdu level value exists
- if not deploy_params_vdu.get("run-day1") and target_vnf[
- "additionalParams"
- ].get("run-day1"):
+ if not deploy_params_vdu.get("run-day1") and target_vnf.get(
+ "additionalParams", {}
+ ).get("run-day1"):
deploy_params_vdu["run-day1"] = target_vnf[
"additionalParams"
].get("run-day1")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "healed", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "healed", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)
self.logger.debug("Wait Heal RO > {}".format(operational_status_ro))
if operational_status_ro != "healing":
break
- await asyncio.sleep(15, loop=self.loop)
+ await asyncio.sleep(15)
else: # timeout_ns_deploy
raise NgRoException("Timeout waiting ns to deploy")
"nslcmop_id": nslcmop_id,
"operationState": nslcmop_operation_state,
}
- await self.msg.aiowrite("ns", "verticalscaled", msg, loop=self.loop)
+ await self.msg.aiowrite("ns", "verticalscaled", msg)
except Exception as e:
self.logger.error(
logging_text + "kafka_write notification Exception {}".format(e)