from osm_common.fsbase import FsException
from n2vc.n2vc_juju_conn import N2VCJujuConnector
-from n2vc.exceptions import N2VCException
+from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
from copy import copy, deepcopy
from http import HTTPStatus
from time import time
from uuid import uuid4
+from functools import partial
__author__ = "Alfonso Tierno"
# write to database
self.update_db_2("nsrs", nsr_id, db_dict)
+ except (asyncio.CancelledError, asyncio.TimeoutError):
+ raise
except Exception as e:
self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
- return
-
def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
"""
Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
format(vnfd["id"], vdu["id"], e))
- def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
+ def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list):
"""
Creates a RO ns descriptor from OSM ns_instantiate params
:param ns_params: OSM instantiate params
# Check for and optionally request placement optimization. Database will be updated if placement activated
stage[2] = "Waiting for Placement."
- await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
+ await self._do_placement(logging_text, db_nslcmop, db_vnfrs)
# deploy RO
await asyncio.wait(task_dependency, timeout=3600)
stage[2] = "Checking instantiation parameters."
- RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
+ RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list)
stage[2] = "Deploying ns at VIM."
db_nsr_update["detailed-status"] = " ".join(stage)
self.update_db_2("nsrs", nsr_id, db_nsr_update)
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ elif kdu_name:
+ namespace += ".{}".format(kdu_name)
+ element_type = 'KDU'
+ element_under_configuration = kdu_name
# Get artifact path
self.fs.sync() # Sync from FSMongo
ee_id_parts = ee_id.split('.')
model_name = ee_id_parts[0]
application_name = ee_id_parts[1]
- self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name,
- db_update_entry + "application": application_name,
- db_update_entry + "ee_id": ee_id})
+ db_nsr_update = {db_update_entry + "model": model_name,
+ db_update_entry + "application": application_name,
+ db_update_entry + "ee_id": ee_id}
# n2vc_redesign STEP 3.3
vca_index=vca_index,
status='INSTALLING SW',
element_under_configuration=element_under_configuration,
- element_type=element_type
+ element_type=element_type,
+ other_update=db_nsr_update
)
# TODO check if already done
self.logger.debug(logging_text + step)
- await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
+ config = None
+ if not is_proxy_charm:
+ initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
+ if initial_config_primitive_list:
+ for primitive in initial_config_primitive_list:
+ if primitive["name"] == "config":
+ config = self._map_primitive_params(
+ primitive,
+ {},
+ deploy_params
+ )
+ break
+ await self.n2vc.install_configuration_sw(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ db_dict=db_dict,
+ config=config
+ )
# write in db flag of configuration_sw already installed
self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
except DbException as e:
self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
- def _write_all_config_status(self, nsr_id: str, status: str):
+ def _write_all_config_status(self, db_nsr: dict, status: str):
try:
- # nsrs record
- db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ nsr_id = db_nsr["_id"]
# configurationStatus
config_status = db_nsr.get('configurationStatus')
if config_status:
+ db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
+ enumerate(config_status) if v}
# update status
- db_dict = dict()
- db_dict['configurationStatus'] = list()
- for c in config_status:
- c['status'] = status
- db_dict['configurationStatus'].append(c)
- self.update_db_2("nsrs", nsr_id, db_dict)
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
except DbException as e:
self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
- element_under_configuration: str = None, element_type: str = None):
+ element_under_configuration: str = None, element_type: str = None,
+ other_update: dict = None):
# self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
# .format(vca_index, status))
try:
db_path = 'configurationStatus.{}.'.format(vca_index)
- db_dict = dict()
+ db_dict = other_update or {}
if status:
db_dict[db_path + 'status'] = status
if element_under_configuration:
self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
.format(status, nsr_id, vca_index, e))
- async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ """
+ Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
+ sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
+ Database is used because the result can be obtained from a different LCM worker in case of HA.
+ :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
+ :param db_nslcmop: database content of nslcmop
+ :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
+ :return: None. Modifies database vnfrs and parameter db_vnfr with the computed 'vim-account-id'
+ """
+ nslcmop_id = db_nslcmop['_id']
placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
if placement_engine == "PLA":
- self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
- await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
+ self.logger.debug(logging_text + "Invoke and wait for placement optimization")
+ await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
db_poll_interval = 5
- wait = db_poll_interval * 4
+ wait = db_poll_interval * 10
pla_result = None
while not pla_result and wait >= 0:
await asyncio.sleep(db_poll_interval)
wait -= db_poll_interval
- db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
if not pla_result:
- raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
+ raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
for pla_vnf in pla_result['vnf']:
vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
if not pla_vnf.get('vimAccountId') or not vnfr:
continue
self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+ # Modifies db_vnfrs
+ vnfr["vim-account-id"] = pla_vnf['vimAccountId']
return
def update_nsrs_with_pla_result(self, params):
self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
return False
+ def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
+ """
+ callback for kdu install intended to store the returned kdu_instance at database
+ :return: None
+ """
+ db_update = {}
+ try:
+ result = task.result()
+ if on_done:
+ db_update[on_done] = str(result)
+ except Exception as e:
+ if on_exc:
+ db_update[on_exc] = str(e)
+ if db_update:
+ try:
+ self.update_db_2(item, _id, db_update)
+ except Exception:
+ pass
+
async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
# Launch kdus if present in the descriptor
for kdur in get_iterable(vnfr_data, "kdur"):
desc_params = self._format_additional_params(kdur.get("additionalParams"))
vnfd_id = vnfr_data.get('vnfd-id')
- pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
+ namespace = kdur.get("k8s-namespace")
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8sclustertype = "helm-chart"
format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
# check if kdumodel is a file and exists
try:
- # path format: /vnfdid/pkkdir/kdumodel
- filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype, kdumodel)
- if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
- kdumodel = self.fs.path + filename
- except asyncio.CancelledError:
+ storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
+ if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
+ # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
+ filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["'pkg-dir"], k8sclustertype,
+ kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except (asyncio.TimeoutError, asyncio.CancelledError):
raise
except Exception: # it is not a file
pass
"k8scluster-type": k8sclustertype,
"member-vnf-index": vnfr_data["member-vnf-index-ref"],
"kdu-name": kdur["kdu-name"],
- "kdu-model": kdumodel}
- db_nsr_update["_admin.deployed.K8s.{}".format(index)] = k8s_instace_info
+ "kdu-model": kdumodel,
+ "namespace": namespace}
+ db_path = "_admin.deployed.K8s.{}".format(index)
+ db_nsr_update[db_path] = k8s_instace_info
self.update_db_2("nsrs", nsr_id, db_nsr_update)
db_dict = {"collection": "nsrs",
"filter": {"_id": nsr_id},
- "path": "_admin.deployed.K8s.{}".format(index)}
+ "path": db_path}
task = asyncio.ensure_future(
self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
atomic=True, params=desc_params,
db_dict=db_dict, timeout=600,
- kdu_name=kdur["kdu-name"]))
+ kdu_name=kdur["kdu-name"], namespace=namespace))
+ task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
+ on_done=db_path + ".kdu-instance",
+ on_exc=db_path + ".detailed-status"))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
# sub-operations
- def _reintent_or_skip_suboperation(self, db_nslcmop, op_index):
- op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
- if (op.get('operationState') == 'COMPLETED'):
+ def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
+ op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
+ if op.get('operationState') == 'COMPLETED':
# b. Skip sub-operation
# _ns_execute_primitive() or RO.create_action() will NOT be executed
return self.SUBOPERATION_STATUS_SKIP
'lcmOperationType': operationType
}
op_index = self._find_suboperation(db_nslcmop, match)
- if (op_index == self.SUBOPERATION_STATUS_NOT_FOUND):
+ if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
# a. New sub-operation
# The sub-operation does not exist, add it.
# _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
vdu_id = None
vdu_count_index = None
vdu_name = None
- if (RO_nsr_id and RO_scaling_info):
+ if RO_nsr_id and RO_scaling_info:
vnf_config_primitive = None
primitive_params = None
else:
else:
# Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
# or op_index (operationState != 'COMPLETED')
- return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
+ return self._retry_or_skip_suboperation(db_nslcmop, op_index)
# Function to return execution_environment id
if destroy_ee:
await self.n2vc.delete_execution_environment(vca_deployed["ee_id"])
- async def _delete_N2VC(self, nsr_id: str):
- self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
- namespace = "." + nsr_id
- await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
- self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
+ async def _delete_all_N2VC(self, db_nsr: dict):
+ self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
+ namespace = "." + db_nsr["_id"]
+ try:
+ await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
+ except N2VCNotFound: # already deleted. Skip
+ pass
+ self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
"""
# remove All execution environments at once
stage[0] = "Stage 3/3 delete all."
- stage[1] = "Deleting all execution environments."
- self.logger.debug(logging_text + stage[1])
- task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_N2VC(nsr_id=nsr_id),
- timeout=self.timeout_charm_delete))
- # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
- tasks_dict_info[task_delete_ee] = "Terminating all VCA"
+ if nsr_deployed.get("VCA"):
+ stage[1] = "Deleting all execution environments."
+ self.logger.debug(logging_text + stage[1])
+ task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
+ timeout=self.timeout_charm_delete))
+ # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+ tasks_dict_info[task_delete_ee] = "Terminating all VCA"
# Delete from k8scluster
stage[1] = "Deleting KDUs."
else:
desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
+ if kdu_name:
+ kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
+
# TODO check if ns is in a proper status
- if kdu_name and primitive in ("upgrade", "rollback", "status"):
+ if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
# kdur and desc_params already set from before
if primitive_params:
desc_params.update(primitive_params)
cluster_uuid=kdu.get("k8scluster-uuid"),
kdu_instance=kdu.get("kdu-instance")),
timeout=timeout_ns_action)
+ else:
+ kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
+ params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
+
+ detailed_status = await asyncio.wait_for(
+ self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
+ cluster_uuid=kdu.get("k8scluster-uuid"),
+ kdu_instance=kdu_instance,
+ primitive_name=primitive,
+ params=params, db_dict=db_dict,
+ timeout=timeout_ns_action),
+ timeout=timeout_ns_action)
if detailed_status:
nslcmop_operation_state = 'COMPLETED'
else:
detailed_status = ''
nslcmop_operation_state = 'FAILED'
-
else:
nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
self._look_for_deployed_vca(nsr_deployed["VCA"],
detailed_status))
return # database update is called inside finally
- except (DbException, LcmException, N2VCException) as e:
+ except (DbException, LcmException, N2VCException, K8sException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except asyncio.CancelledError: