import logging
import logging.handlers
import traceback
+import json
from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
from osm_lcm import ROclient
-from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get
+from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
from n2vc.k8s_helm_conn import K8sHelmConnector
from n2vc.k8s_juju_conn import K8sJujuConnector
from osm_common.fsbase import FsException
from n2vc.n2vc_juju_conn import N2VCJujuConnector
+from n2vc.exceptions import N2VCException
from copy import copy, deepcopy
from http import HTTPStatus
__author__ = "Alfonso Tierno"
-def get_iterable(in_dict, in_key):
- """
- Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
- :param in_dict: a dictionary
- :param in_key: the key to look for at in_dict
- :return: in_dict[in_var] or () if it is None or not present
- """
- if not in_dict.get(in_key):
- return ()
- return in_dict[in_key]
-
-
-def populate_dict(target_dict, key_list, value):
- """
- Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
- Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
- :param target_dict: dictionary to be changed
- :param key_list: list of keys to insert at target_dict
- :param value:
- :return: None
- """
- for key in key_list[0:-1]:
- if key not in target_dict:
- target_dict[key] = {}
- target_dict = target_dict[key]
- target_dict[key_list[-1]] = value
-
-
class NsLcm(LcmBase):
timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
- total_deploy_timeout = 2 * 3600 # global timeout for deployment
+ timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
timeout_charm_delete = 10 * 60
timeout_primitive = 10 * 60 # timeout for primitive execution
SUBOPERATION_STATUS_NEW = -2
SUBOPERATION_STATUS_SKIP = -3
- def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.loop = loop
self.lcm_tasks = lcm_tasks
- self.ro_config = ro_config
- self.vca_config = vca_config
- if 'pubkey' in self.vca_config:
- self.vca_config['public_key'] = self.vca_config['pubkey']
- if 'cacert' in self.vca_config:
- self.vca_config['ca_cert'] = self.vca_config['cacert']
- if 'apiproxy' in self.vca_config:
- self.vca_config['api_proxy'] = self.vca_config['apiproxy']
+ self.timeout = config["timeout"]
+ self.ro_config = config["ro_config"]
+ self.vca_config = config["VCA"].copy()
# create N2VC connector
self.n2vc = N2VCJujuConnector(
url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
username=self.vca_config.get('user', None),
vca_config=self.vca_config,
- on_update_db=self._on_update_n2vc_db,
- # ca_cert=self.vca_config.get('cacert'),
- # api_proxy=self.vca_config.get('apiproxy'),
+ on_update_db=self._on_update_n2vc_db
)
self.k8sclusterhelm = K8sHelmConnector(
# create RO client
self.RO = ROclient.ROClient(self.loop, **self.ro_config)
- def _on_update_n2vc_db(self, table, filter, path, updated_data):
+ def _on_update_ro_db(self, nsrs_id, ro_descriptor):
+
+ # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
+
+ try:
+ # TODO filter RO descriptor fields...
+
+ # write to database
+ db_dict = dict()
+ # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
+ db_dict['deploymentStatus'] = ro_descriptor
+ self.update_db_2("nsrs", nsrs_id, db_dict)
+
+ except Exception as e:
+ self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
+
+ async def _on_update_n2vc_db(self, table, filter, path, updated_data):
+
+ # remove last dot from path (if exists)
+ if path.endswith('.'):
+ path = path[:-1]
+
+ # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
+ # .format(table, filter, path, updated_data))
+
+ try:
+
+ nsr_id = filter.get('_id')
+
+ # read ns record from database
+ nsr = self.db.get_one(table='nsrs', q_filter=filter)
+ current_ns_status = nsr.get('nsState')
- self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
- .format(table, filter, path, updated_data))
+ # get vca status for NS
+ status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
+
+ # vcaStatus
+ db_dict = dict()
+ db_dict['vcaStatus'] = status_dict
+
+ # update configurationStatus for this VCA
+ try:
+ vca_index = int(path[path.rfind(".")+1:])
+
+ vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
+ vca_status = vca_list[vca_index].get('status')
+
+ configuration_status_list = nsr.get('configurationStatus')
+ config_status = configuration_status_list[vca_index].get('status')
+
+ if config_status == 'BROKEN' and vca_status != 'failed':
+ db_dict['configurationStatus'][vca_index] = 'READY'
+ elif config_status != 'BROKEN' and vca_status == 'failed':
+ db_dict['configurationStatus'][vca_index] = 'BROKEN'
+ except Exception as e:
+ # not update configurationStatus
+ self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
+
+ # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
+ # if nsState = 'DEGRADED' check if all is OK
+ is_degraded = False
+ if current_ns_status in ('READY', 'DEGRADED'):
+ error_description = ''
+ # check machines
+ if status_dict.get('machines'):
+ for machine_id in status_dict.get('machines'):
+ machine = status_dict.get('machines').get(machine_id)
+ # check machine agent-status
+ if machine.get('agent-status'):
+ s = machine.get('agent-status').get('status')
+ if s != 'started':
+ is_degraded = True
+ error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
+ # check machine instance status
+ if machine.get('instance-status'):
+ s = machine.get('instance-status').get('status')
+ if s != 'running':
+ is_degraded = True
+ error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
+ # check applications
+ if status_dict.get('applications'):
+ for app_id in status_dict.get('applications'):
+ app = status_dict.get('applications').get(app_id)
+ # check application status
+ if app.get('status'):
+ s = app.get('status').get('status')
+ if s != 'active':
+ is_degraded = True
+ error_description += 'application {} status={} ; '.format(app_id, s)
+
+ if error_description:
+ db_dict['errorDescription'] = error_description
+ if current_ns_status == 'READY' and is_degraded:
+ db_dict['nsState'] = 'DEGRADED'
+ if current_ns_status == 'DEGRADED' and not is_degraded:
+ db_dict['nsState'] = 'READY'
+
+ # write to database
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ except Exception as e:
+ self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
return
- # write NS status to database
- # try:
- # # nsrs_id = filter.get('_id')
- # # print(nsrs_id)
- # # get ns record
- # nsr = self.db.get_one(table=table, q_filter=filter)
- # # get VCA deployed list
- # vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
- # # get RO deployed
- # # ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO'))
- # for vca in vca_list:
- # # status = vca.get('status')
- # # print(status)
- # # detailed_status = vca.get('detailed-status')
- # # print(detailed_status)
- # # for ro in ro_list:
- # # print(ro)
- #
- # except Exception as e:
- # self.logger.error('Error writing NS status to db: {}'.format(e))
def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
"""
if isinstance(vld_params["ns-net"], dict):
for vld_id, instance_scenario_id in vld_params["ns-net"].items():
RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
- if RO_vld_ns_net:
- populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
+ populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
if "vnfd-connection-point-ref" in vld_params:
for cp_params in vld_params["vnfd-connection-point-ref"]:
# look for interface
else:
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
- @staticmethod
- def _get_ns_config_info(vca_deployed_list):
+ def _get_ns_config_info(self, nsr_id):
"""
Generates a mapping between vnf,vdu elements and the N2VC id
- :param vca_deployed_list: List of database _admin.deploy.VCA that contains this list
+ :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
:return: a dictionary with {osm-config-mapping: {}} where its element contains:
"<member-vnf-index>": <N2VC-id> for a vnf configuration, or
"<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
"""
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
mapping = {}
ns_config_info = {"osm-config-mapping": mapping}
for vca in vca_deployed_list:
start_deploy = time()
vdu_flag = False # If any of the VNFDs has VDUs
ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
+
+ # Check for and optionally request placement optimization. Database will be updated if placement activated
+ await self.do_placement(logging_text, db_nslcmop, db_vnfrs)
# deploy RO
db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# create nsd at RO
nsd_ref = nsd["id"]
db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# Crate ns at RO
# if present use it unless in error status
step = db_nsr_update["_admin.deployed.RO.detailed-status"] = "Looking for existing ns at RO"
# self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
desc = await self.RO.show("ns", RO_nsr_id)
+
except ROclient.ROClientException as e:
if e.http_code != HTTPStatus.NOT_FOUND:
raise
db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
# wait until NS is ready
step = ns_status_detailed = detailed_status = "Waiting VIM to deploy ns. RO_ns_id={}".format(RO_nsr_id)
detailed_status_old = None
self.logger.debug(logging_text + step)
- while time() <= start_deploy + self.total_deploy_timeout:
+ old_desc = None
+ while time() <= start_deploy + timeout_ns_deploy:
desc = await self.RO.show("ns", RO_nsr_id)
- ns_status, ns_status_info = self.RO.check_ns_status(desc)
- db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
- if ns_status == "ERROR":
- raise ROclient.ROClientException(ns_status_info)
- elif ns_status == "BUILD":
- detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
- elif ns_status == "ACTIVE":
- step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
- try:
- if vdu_flag:
- self.ns_update_vnfr(db_vnfrs, desc)
- break
- except LcmExceptionNoMgmtIP:
- pass
- else:
- assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
- if detailed_status != detailed_status_old:
- detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
- self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
- await asyncio.sleep(5, loop=self.loop)
- else: # total_deploy_timeout
+
+ # deploymentStatus
+ if desc != old_desc:
+ # desc has changed => update db
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+ old_desc = desc
+
+ ns_status, ns_status_info = self.RO.check_ns_status(desc)
+ db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
+ if ns_status == "ERROR":
+ raise ROclient.ROClientException(ns_status_info)
+ elif ns_status == "BUILD":
+ detailed_status = ns_status_detailed + "; {}".format(ns_status_info)
+ elif ns_status == "ACTIVE":
+ step = detailed_status = "Waiting for management IP address reported by the VIM. Updating VNFRs"
+ try:
+ if vdu_flag:
+ self.ns_update_vnfr(db_vnfrs, desc)
+ break
+ except LcmExceptionNoMgmtIP:
+ pass
+ else:
+ assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
+ if detailed_status != detailed_status_old:
+ detailed_status_old = db_nsr_update["_admin.deployed.RO.detailed-status"] = detailed_status
+ self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ await asyncio.sleep(5, loop=self.loop)
+ else: # timeout_ns_deploy
raise ROclient.ROClientException("Timeout waiting ns to be ready")
step = "Updating NSR"
db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
db_nsr_update["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
- self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
+ # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
step = "Deployed at VIM"
self.logger.debug(logging_text + step)
ip_address = None
nb_tries = 0
target_vdu_id = None
+ ro_retries = 0
while True:
+ ro_retries += 1
+ if ro_retries >= 360: # 1 hour
+ raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
+
await asyncio.sleep(10, loop=self.loop)
# wait until NS is deployed at RO
if not ro_nsr_id:
# get ip address
if not target_vdu_id:
db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
- if not vdu_id:
+
+ if not vdu_id: # for the VNF case
ip_address = db_vnfr.get("ip-address")
if not ip_address:
continue
- for vdur in get_iterable(db_vnfr, "vdur"):
- if (vdur["vdu-id-ref"] == vdu_id and vdur["count-index"] == vdu_index) or \
- (ip_address and vdur.get("ip-address") == ip_address):
- if vdur.get("status") == "ACTIVE":
- target_vdu_id = vdur["vdu-id-ref"]
- elif vdur.get("status") == "ERROR":
- raise LcmException("Cannot inject ssh-key because target VM is in error state")
- break
- else:
+ vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
+ else: # VDU case
+ vdur = next((x for x in get_iterable(db_vnfr, "vdur")
+ if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
+
+ if not vdur:
raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format(
vnfr_id, vdu_id, vdu_index
))
+ if vdur.get("status") == "ACTIVE":
+ ip_address = vdur.get("ip-address")
+ if not ip_address:
+ continue
+ target_vdu_id = vdur["vdu-id-ref"]
+ elif vdur.get("status") == "ERROR":
+ raise LcmException("Cannot inject ssh-key because target VM is in error state")
+
if not target_vdu_id:
continue
return ip_address
- async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id,
- kdu_name, vdu_index, config_descriptor, deploy_params, base_folder):
+ async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
+ """
+ Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
+ """
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # vdu or kdu: no dependencies
+ return
+ timeout = 300
+ while timeout >= 0:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
+ configuration_status_list = db_nsr["configurationStatus"]
+ for index, vca_deployed in enumerate(configuration_status_list):
+ if index == vca_index:
+ # myself
+ continue
+ if not my_vca.get("member-vnf-index") or \
+ (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
+ internal_status = configuration_status_list[index].get("status")
+ if internal_status == 'READY':
+ continue
+ elif internal_status == 'BROKEN':
+ raise LcmException("Configuration aborted because dependent charm/s has failed")
+ else:
+ break
+ else:
+ # no dependencies, return
+ return
+ await asyncio.sleep(10)
+ timeout -= 1
+
+ raise LcmException("Configuration aborted because dependent charm/s timeout")
+
+ async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name,
+ vdu_index, config_descriptor, deploy_params, base_folder, nslcmop_id):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
'filter': {'_id': nsr_id},
'path': db_update_entry
}
- logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"],
- vdu_id, vdu_index)
-
step = ""
try:
+
+ element_type = 'NS'
+ element_under_configuration = nsr_id
+
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
ns=nsr_id)
+
if vnfr_id:
- namespace += "." + vnfr_id
+ element_type = 'VNF'
+ element_under_configuration = vnfr_id
+ namespace += ".{}".format(vnfr_id)
if vdu_id:
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
+ element_type = 'VDU'
+ element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
# Get artifact path
- artifact_path = "/{}/{}/charms/{}".format(
+ self.fs.sync() # Sync from FSMongo
+ artifact_path = "{}/{}/charms/{}".format(
base_folder["folder"],
base_folder["pkg-dir"],
config_descriptor["juju"]["charm"]
# create or register execution environment in VCA
if is_proxy_charm:
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='CREATING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
step = "create execution environment"
self.logger.debug(logging_text + step)
ee_id, credentials = await self.n2vc.create_execution_environment(namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict)
+
else:
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
credentials["username"] = username
# n2vc_redesign STEP 3.2
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='REGISTERING',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
step = "register execution environment {}".format(credentials)
self.logger.debug(logging_text + step)
ee_id = await self.n2vc.register_execution_environment(credentials=credentials, namespace=namespace,
# n2vc_redesign STEP 3.3
step = "Install configuration Software"
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='INSTALLING SW',
+ element_under_configuration=element_under_configuration,
+ element_type=element_type
+ )
+
# TODO check if already done
self.logger.debug(logging_text + step)
await self.n2vc.install_configuration_sw(ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict)
+ # write in db flag of configuration_sw already installed
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
+
+ # add relations for this VCA (wait for other peers related with this VCA)
+ await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id, vca_index=vca_index)
+
# if SSH access is required, then get execution environment SSH public
if is_proxy_charm: # if native charm we have waited already to VM be UP
pub_key = None
step = "Install configuration Software, getting public ssh key"
pub_key = await self.n2vc.get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
- step = "Insert public key into VM"
+ step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
else:
step = "Waiting to VM being up and getting IP address"
self.logger.debug(logging_text + step)
# n2vc_redesign STEP 5.1
# wait for RO (ip-address) Insert pub_key into VM
- rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
- user=user, pub_key=pub_key)
+ if vnfr_id:
+ rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
+ user=user, pub_key=pub_key)
+ else:
+ rw_mgmt_ip = None # This is for a NS configuration
self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
# sort initial config primitives by 'seq'
- try:
- initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
- except Exception as e:
- self.logger.error(logging_text + step + ": " + str(e))
+ if initial_config_primitive_list:
+ try:
+ initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
+ except Exception as e:
+ self.logger.error(logging_text + step + ": " + str(e))
+ else:
+ self.logger.debug(logging_text + step + ": No initial-config-primitive")
# add config if not present for NS charm
initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
vca_deployed)
+ # wait for dependent primitives execution (NS -> VNF -> VDU)
+ if initial_config_primitive_list:
+ await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
+
+ # stage, in function of element type: vdu, kdu, vnf or ns
+ my_vca = vca_deployed_list[vca_index]
+ if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
+ # VDU or KDU
+ stage = 'Stage 3/5: running Day-1 primitives for VDU'
+ elif my_vca.get("member-vnf-index"):
+ # VNF
+ stage = 'Stage 4/5: running Day-1 primitives for VNF'
+ else:
+ # NS
+ stage = 'Stage 5/5: running Day-1 primitives for NS'
+
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='EXECUTING PRIMITIVE'
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage=stage
+ )
+
for initial_config_primitive in initial_config_primitive_list:
# adding information on the vca_deployed if it is a NS execution environment
if not vca_deployed["member-vnf-index"]:
- deploy_params["ns_config_info"] = self._get_ns_config_info(vca_deployed_list)
+ deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
# TODO check if already done
primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
params_dict=primitive_params_,
db_dict=db_dict
)
+
# TODO register in database that primitive is done
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='READY'
+ )
+
except Exception as e: # TODO not use Exception but N2VC exception
+ # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
+ self._write_configuration_status(
+ nsr_id=nsr_id,
+ vca_index=vca_index,
+ status='BROKEN'
+ )
raise Exception("{} {}".format(step, e)) from e
# TODO raise N2VC exception with 'step' extra information
+ def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
+ error_description: str = None):
+ try:
+ db_dict = dict()
+ if ns_state:
+ db_dict["nsState"] = ns_state
+ db_dict["currentOperation"] = current_operation
+ db_dict["currentOperationID"] = current_operation_id
+ db_dict["errorDescription"] = error_description
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_op_status(self, op_id: str, stage: str = None, error_message: str = None, queuePosition: int = 0):
+ try:
+ db_dict = dict()
+ db_dict['queuePosition'] = queuePosition
+ db_dict['stage'] = stage
+ if error_message:
+ db_dict['errorMessage'] = error_message
+ self.update_db_2("nslcmops", op_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
+
+ def _write_all_config_status(self, nsr_id: str, status: str):
+ try:
+ # nsrs record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ # configurationStatus
+ config_status = db_nsr.get('configurationStatus')
+ if config_status:
+ # update status
+ db_dict = dict()
+ db_dict['configurationStatus'] = list()
+ for c in config_status:
+ c['status'] = status
+ db_dict['configurationStatus'].append(c)
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
+ except Exception as e:
+ self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
+
+ def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
+ element_under_configuration: str = None, element_type: str = None):
+
+ # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
+ # .format(vca_index, status))
+
+ try:
+ db_path = 'configurationStatus.{}.'.format(vca_index)
+ db_dict = dict()
+ if status:
+ db_dict[db_path + 'status'] = status
+ if element_under_configuration:
+ db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
+ if element_type:
+ db_dict[db_path + 'elementType'] = element_type
+ self.update_db_2("nsrs", nsr_id, db_dict)
+ except Exception as e:
+ self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
+ .format(status, nsr_id, vca_index, e))
+
+ async def do_placement(self, logging_text, db_nslcmop, db_vnfrs):
+ placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
+ if placement_engine == "PLA":
+ self.logger.debug(logging_text + "Invoke placement optimization for nslcmopId={}".format(db_nslcmop['id']))
+ await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': db_nslcmop['_id']}, loop=self.loop)
+ db_poll_interval = 5
+ wait = db_poll_interval * 4
+ pla_result = None
+ while not pla_result and wait >= 0:
+ await asyncio.sleep(db_poll_interval)
+ wait -= db_poll_interval
+ db_nslcmop = self.db.get_one("nslcmops", {"_id": db_nslcmop["_id"]})
+ pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
+
+ if not pla_result:
+ raise LcmException("Placement timeout for nslcmopId={}".format(db_nslcmop['id']))
+
+ for pla_vnf in pla_result['vnf']:
+ vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
+ if not pla_vnf.get('vimAccountId') or not vnfr:
+ continue
+ self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
+ return
+
+ def update_nsrs_with_pla_result(self, params):
+ try:
+ nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
+ self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
+ except Exception as e:
+ self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
+
async def instantiate(self, nsr_id, nslcmop_id):
"""
# Try to lock HA task here
task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
if not task_is_locked_by_me:
- self.logger.debug('instantiate() task is not locked by me')
+ self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
return
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
db_vnfrs = {} # vnf's info indexed by member-index
# n2vc_info = {}
task_instantiation_list = []
+ task_instantiation_info = {} # from task to info text
exc = None
try:
# wait for any previous tasks in process
# STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
+ # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="BUILDING",
+ current_operation="INSTANTIATING",
+ current_operation_id=nslcmop_id
+ )
+
# read from db: operation
step = "Getting nslcmop={} from db".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
+ ns_params = db_nslcmop.get("operationParams")
+ if ns_params and ns_params.get("timeout_ns_deploy"):
+ timeout_ns_deploy = ns_params["timeout_ns_deploy"]
+ else:
+ timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
# read from db: ns
step = "Getting nsr={} from db".format(nsr_id)
db_vnfds = {} # every vnfd data indexed by vnf id
db_vnfds_index = {} # every vnfd data indexed by vnf member-index
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage='Stage 1/5: preparation of the environment',
+ queuePosition=0
+ )
+
# for each vnf in ns, read vnfd
for vnfr in db_vnfrs_list:
db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
# if we haven't this vnfd, read it from db
if vnfd_id not in db_vnfds:
- # read from cb
+ # read from db
step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
self.logger.debug(logging_text + step)
vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
if vca_deployed_list is None:
vca_deployed_list = []
+ configuration_status_list = []
db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
+ db_nsr_update["configurationStatus"] = configuration_status_list
# add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
elif isinstance(vca_deployed_list, dict):
# set state to INSTANTIATED. When instantiated NBI will not delete directly
db_nsr_update["_admin.nsState"] = "INSTANTIATED"
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # n2vc_redesign STEP 2 Deploy Network Scenario
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ stage='Stage 2/5: deployment of VMs and execution environments'
+ )
+
self.logger.debug(logging_text + "Before deploy_kdus")
# Call to deploy_kdus in case exists the "vdu:kdu" param
task_kdu = asyncio.ensure_future(
nsr_id=nsr_id,
db_nsr=db_nsr,
db_vnfrs=db_vnfrs,
+ db_vnfds=db_vnfds
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDUs", task_kdu)
+ task_instantiation_info[task_kdu] = "Deploy KDUs"
task_instantiation_list.append(task_kdu)
# n2vc_redesign STEP 1 Get VCA public ssh-key
# feature 1429. Add n2vc public key to needed VMs
if self.vca_config.get("public_key"):
n2vc_key_list.append(self.vca_config["public_key"])
- # n2vc_redesign STEP 2 Deploy Network Scenario
task_ro = asyncio.ensure_future(
self.instantiate_RO(
logging_text=logging_text,
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
+ task_instantiation_info[task_ro] = "Deploy at VIM"
task_instantiation_list.append(task_ro)
# n2vc_redesign STEP 3 to 6 Deploy N2VC
- step = "Looking for needed vnfd to configure with proxy charm"
+ step = "Deploying proxy and native charms"
self.logger.debug(logging_text + step)
nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
descriptor_config = vnfd.get("vnf-configuration")
if descriptor_config and descriptor_config.get("juju"):
self._deploy_n2vc(
- logging_text=logging_text,
+ logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_list=task_instantiation_list
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
)
# Deploy charms for each VDU that supports one.
for vdu_index in range(int(vdud.get("count", 1))):
# TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
self._deploy_n2vc(
- logging_text=logging_text,
+ logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
+ member_vnf_index, vdu_id, vdu_index),
db_nsr=db_nsr,
db_vnfr=db_vnfr,
nslcmop_id=nslcmop_id,
deploy_params=deploy_params_vdu,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_list=task_instantiation_list
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
)
for kdud in get_iterable(vnfd, 'kdu'):
kdu_name = kdud["name"]
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_list=task_instantiation_list
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
)
# Check if this NS has a charm configuration
deploy_params=deploy_params,
descriptor_config=descriptor_config,
base_folder=base_folder,
- task_instantiation_list=task_instantiation_list
+ task_instantiation_list=task_instantiation_list,
+ task_instantiation_info=task_instantiation_info
)
# Wait until all tasks of "task_instantiation_list" have been finished
- # while time() <= start_deploy + self.total_deploy_timeout:
- error_text = None
- timeout = 3600 # time() - start_deploy
- task_instantiation_set = set(task_instantiation_list) # build a set with tasks
- done = None
- pending = None
- if len(task_instantiation_set) > 0:
- done, pending = await asyncio.wait(task_instantiation_set, timeout=timeout)
- if pending:
- error_text = "timeout"
- for task in done:
- if task.cancelled():
- if not error_text:
- error_text = "cancelled"
- elif task.done():
- exc = task.exception()
- if exc:
- error_text = str(exc)
+ error_text_list = []
+
+ # let's begin with all OK
+ instantiated_ok = True
+ # let's begin with RO 'running' status (later we can change it)
+ db_nsr_update["operational-status"] = "running"
+ # let's begin with VCA 'configured' status (later we can change it)
+ db_nsr_update["config-status"] = "configured"
+
+ step = "Waiting for tasks to be finished"
+ if task_instantiation_list:
+ # wait for all tasks completion
+ done, pending = await asyncio.wait(task_instantiation_list, timeout=timeout_ns_deploy)
+
+ for task in pending:
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task is pending
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC task is pending
+ db_nsr_update["config-status"] = "failed"
+ self.logger.error(logging_text + task_instantiation_info[task] + ": Timeout")
+ error_text_list.append(task_instantiation_info[task] + ": Timeout")
+ for task in done:
+ if task.cancelled():
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task was cancelled
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC was cancelled
+ db_nsr_update["config-status"] = "failed"
+ self.logger.warn(logging_text + task_instantiation_info[task] + ": Cancelled")
+ error_text_list.append(task_instantiation_info[task] + ": Cancelled")
+ else:
+ exc = task.exception()
+ if exc:
+ instantiated_ok = False
+ if task in (task_ro, task_kdu):
+ # RO or KDU task raised an exception
+ db_nsr_update["operational-status"] = "failed"
+ else:
+ # A N2VC task raised an exception
+ db_nsr_update["config-status"] = "failed"
+ self.logger.error(logging_text + task_instantiation_info[task] + ": Failed")
- if error_text:
- db_nsr_update["config-status"] = "failed"
- error_text = "fail configuring " + error_text
+ if isinstance(exc, (N2VCException, ROclient.ROClientException)):
+ error_text_list.append(task_instantiation_info[task] + ": {}".format(exc))
+ else:
+ exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
+ self.logger.error(logging_text + task_instantiation_info[task] + exc_traceback)
+ error_text_list.append(task_instantiation_info[task] + ": " + exc_traceback)
+ else:
+ self.logger.debug(logging_text + task_instantiation_info[task] + ": Done")
+
+ if error_text_list:
+ error_text = "\n".join(error_text_list)
db_nsr_update["detailed-status"] = error_text
- db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED_TEMP"
db_nslcmop_update["detailed-status"] = error_text
+ db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
db_nslcmop_update["statusEnteredTime"] = time()
else:
# all is done
+ db_nsr_update["detailed-status"] = "done"
+ db_nslcmop_update["detailed-status"] = "done"
db_nslcmop_update["operationState"] = nslcmop_operation_state = "COMPLETED"
db_nslcmop_update["statusEnteredTime"] = time()
- db_nslcmop_update["detailed-status"] = "done"
- db_nsr_update["config-status"] = "configured"
- db_nsr_update["detailed-status"] = "done"
except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e))
if db_nsr:
db_nsr_update["detailed-status"] = "ERROR {}: {}".format(step, exc)
db_nsr_update["operational-status"] = "failed"
+ db_nsr_update["config-status"] = "failed"
if db_nslcmop:
db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
db_nsr_update["_admin.current-operation"] = None
db_nsr_update["_admin.operation-type"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ # nsState="READY/BROKEN", currentOperation="IDLE", currentOperationID=None
+ ns_state = None
+ error_description = None
+ if instantiated_ok:
+ ns_state = "READY"
+ else:
+ ns_state = "BROKEN"
+ error_description = 'Operation: INSTANTIATING.{}, step: {}'.format(nslcmop_id, step)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ error_message=error_description
+ )
+
if db_nslcmop_update:
self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
+
+ self.logger.debug(logging_text + 'End of instantiation: {}'.format(instantiated_ok))
+
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
+
if nslcmop_operation_state:
try:
await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
- async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs):
+ async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int, timeout: int = 3600) -> bool:
+
+ # steps:
+ # 1. find all relations for this VCA
+ # 2. wait for other peers related
+ # 3. add relations
+
+ try:
+
+ # STEP 1: find all relations for this VCA
+
+ # read nsr record
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # this VCA data
+ my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
+
+ # read all ns-configuration relations
+ ns_relations = list()
+ db_ns_relations = deep_get(db_nsr, ('nsd', 'ns-configuration', 'relation'))
+ if db_ns_relations:
+ for r in db_ns_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('member-vnf-index') in\
+ (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ ns_relations.append(r)
+
+ # read all vnf-configuration relations
+ vnf_relations = list()
+ db_vnfd_list = db_nsr.get('vnfd-id')
+ if db_vnfd_list:
+ for vnfd in db_vnfd_list:
+ db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
+ db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
+ if db_vnf_relations:
+ for r in db_vnf_relations:
+ # check if this VCA is in the relation
+ if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
+ vnf_relations.append(r)
+
+ # if no relations, terminate
+ if not ns_relations and not vnf_relations:
+ self.logger.debug(logging_text + ' No relations')
+ return True
+
+ self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
+
+ # add all relations
+ start = time()
+ while True:
+ # check timeout
+ now = time()
+ if now - start >= timeout:
+ self.logger.error(logging_text + ' : timeout adding relations')
+ return False
+
+ # reload nsr from database (we need to update record: _admin.deloyed.VCA)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+
+ # for each defined NS relation, find the VCA's related
+ for r in ns_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
+ and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
+ and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ ns_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # for each defined VNF relation, find the VCA's related
+ for r in vnf_relations:
+ from_vca_ee_id = None
+ to_vca_ee_id = None
+ from_vca_endpoint = None
+ to_vca_endpoint = None
+ vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
+ for vca in vca_list:
+ if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
+ from_vca_ee_id = vca.get('ee_id')
+ from_vca_endpoint = r.get('entities')[0].get('endpoint')
+ if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
+ to_vca_ee_id = vca.get('ee_id')
+ to_vca_endpoint = r.get('entities')[1].get('endpoint')
+ if from_vca_ee_id and to_vca_ee_id:
+ # add relation
+ await self.n2vc.add_relation(
+ ee_id_1=from_vca_ee_id,
+ ee_id_2=to_vca_ee_id,
+ endpoint_1=from_vca_endpoint,
+ endpoint_2=to_vca_endpoint)
+ # remove entry from relations list
+ vnf_relations.remove(r)
+ else:
+ # check failed peers
+ try:
+ vca_status_list = db_nsr.get('configurationStatus')
+ if vca_status_list:
+ for i in range(len(vca_list)):
+ vca = vca_list[i]
+ vca_status = vca_status_list[i]
+ if vca.get('vdu_id') == r.get('entities')[0].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ if vca.get('vdu_id') == r.get('entities')[1].get('id'):
+ if vca_status.get('status') == 'BROKEN':
+ # peer broken: remove relation from list
+ ns_relations.remove(r)
+ except Exception:
+ # ignore
+ pass
+
+ # wait for next try
+ await asyncio.sleep(5.0)
+
+ if not ns_relations and not vnf_relations:
+ self.logger.debug('Relations added')
+ break
+
+ return True
+
+ except Exception as e:
+ self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
+ return False
+
+ async def deploy_kdus(self, logging_text, nsr_id, db_nsr, db_vnfrs, db_vnfds):
# Launch kdus if present in the descriptor
+ deployed_ok = True
+
k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
def _get_cluster_id(cluster_id, cluster_type):
k8sclustertype = None
error_text = None
cluster_uuid = None
+ vnfd_id = vnfr_data.get('vnfd-id')
+ pkgdir = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage', 'pkg-dir'))
if kdur.get("helm-chart"):
kdumodel = kdur["helm-chart"]
k8sclustertype = "chart"
k8sclustertype = "juju"
k8sclustertype_full = "juju-bundle"
else:
- error_text = "kdu type is neither helm-chart not juju-bundle. Maybe an old NBI version is" \
+ error_text = "kdu type is neither helm-chart nor juju-bundle. Maybe an old NBI version is" \
" running"
+ # check if kdumodel is a file and exists
+ try:
+ # path format: /vnfdid/pkkdir/kdumodel
+ filename = '{}/{}/{}s/{}'.format(vnfd_id, pkgdir, k8sclustertype_full, kdumodel)
+ if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
+ kdumodel = self.fs.path + filename
+ except Exception:
+ # it is not a file
+ pass
try:
if not error_text:
cluster_uuid = _get_cluster_id(kdur["k8s-cluster"]["id"], k8sclustertype_full)
except LcmException as e:
error_text = str(e)
+ deployed_ok = False
+
step = "Instantiate KDU {} in k8s cluster {}".format(kdur["kdu-name"], cluster_uuid)
k8s_instace_info = {"kdu-instance": None, "k8scluster-uuid": cluster_uuid,
params=desc_params, db_dict=db_dict, timeout=3600)
)
else:
- task = self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
- atomic=True, params=desc_params,
- db_dict=db_dict, timeout=600)
+ task = asyncio.ensure_future(
+ self.k8sclusterjuju.install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
+ atomic=True, params=desc_params,
+ db_dict=db_dict, timeout=600,
+ kdu_name=kdur["kdu-name"])
+ )
pending_tasks[task] = "_admin.deployed.K8s.{}.".format(index)
index += 1
- if not pending_tasks:
- return
- self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
- pending_list = list(pending_tasks.keys())
- while pending_list:
- done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
- return_when=asyncio.FIRST_COMPLETED)
- if not done_list: # timeout
- for task in pending_list:
- db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
- break
- for task in done_list:
- exc = task.exception()
- if exc:
- db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
- else:
- db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
+
+ if pending_tasks:
+ self.logger.debug(logging_text + 'Waiting for terminate pending tasks...')
+ pending_list = list(pending_tasks.keys())
+ while pending_list:
+ done_list, pending_list = await asyncio.wait(pending_list, timeout=30*60,
+ return_when=asyncio.FIRST_COMPLETED)
+ if not done_list: # timeout
+ for task in pending_list:
+ db_nsr_update[pending_tasks(task) + "detailed-status"] = "Timeout"
+ deployed_ok = False
+ break
+ for task in done_list:
+ exc = task.exception()
+ if exc:
+ db_nsr_update[pending_tasks[task] + "detailed-status"] = "{}".format(exc)
+ deployed_ok = False
+ else:
+ db_nsr_update[pending_tasks[task] + "kdu-instance"] = task.result()
+
+ if not deployed_ok:
+ raise LcmException('Cannot deploy KDUs')
except Exception as e:
- self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e))
- raise LcmException("{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e))
+ msg = "{} Exit Exception {} while '{}': {}".format(logging_text, type(e).__name__, step, e)
+ self.logger.error(msg)
+ raise LcmException(msg)
finally:
- # TODO Write in data base
if db_nsr_update:
self.update_db_2("nsrs", nsr_id, db_nsr_update)
def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
- base_folder, task_instantiation_list):
+ base_folder, task_instantiation_list, task_instantiation_info):
# launch instantiate_N2VC in a asyncio task and register task object
# Look where information of this charm is at database <nsrs>._admin.deployed.VCA
# if not found, create one entry and update database
"vdu_name": vdu_name,
}
vca_index += 1
- self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed})
+
+ # create VCA and configurationStatus in db
+ db_dict = {
+ "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
+ "configurationStatus.{}".format(vca_index): dict()
+ }
+ self.update_db_2("nsrs", nsr_id, db_dict)
+
db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
# Launch task
deploy_params=deploy_params,
config_descriptor=descriptor_config,
base_folder=base_folder,
+ nslcmop_id=nslcmop_id
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
+ task_instantiation_info[task_n2vc] = "Deploy VCA {}.{}".format(member_vnf_index or "", vdu_id or "")
task_instantiation_list.append(task_n2vc)
# Check if this VNFD has a configured terminate action
# or op_index (operationState != 'COMPLETED')
return self._reintent_or_skip_suboperation(db_nslcmop, op_index)
+ # Function to return execution_environment id
+
+ def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
+ for vca in vca_deployed_list:
+ if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
+ return vca["ee_id"]
+
# Helper methods for terminate()
async def _terminate_action(self, db_nslcmop, nslcmop_id, nsr_id):
Called from terminate() before deleting instance
Calls action() to execute the primitive """
logging_text = "Task ns={} _terminate_action={} ".format(nsr_id, nslcmop_id)
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
db_vnfds = {}
# Loop over VNFRs
# " primitive={} fails with error {}".format(
# vnf_index, seq.get("name"), result_detail))
- # TODO: find ee_id
- ee_id = None
+ ee_id = self._get_ee_id(vnf_index, vdu_id, vca_deployed_list)
try:
await self.n2vc.exec_primitive(
ee_id=ee_id,
.format(vnf_index, seq.get("name"), e),
)
+ async def _delete_N2VC(self, nsr_id: str):
+ self._write_all_config_status(nsr_id=nsr_id, status='TERMINATING')
+ namespace = "." + nsr_id
+ await self.n2vc.delete_namespace(namespace=namespace)
+ self._write_all_config_status(nsr_id=nsr_id, status='DELETED')
+
async def terminate(self, nsr_id, nslcmop_id):
# Try to lock HA task here
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state="TERMINATING",
+ current_operation="TERMINATING",
+ current_operation_id=nslcmop_id
+ )
+ self._write_op_status(
+ op_id=nslcmop_id,
+ queuePosition=0
+ )
+
step = "Getting nslcmop={} from db".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr={} from db".format(nsr_id)
step = "delete execution environment"
self.logger.debug(logging_text + step)
- task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+ task_delete_ee = asyncio.ensure_future(self._delete_N2VC(nsr_id=nsr_id))
+ # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
+
pending_tasks.append(task_delete_ee)
except Exception as e:
- msg = "Failed while deleting NS in VCA: {}".format(e)
+ msg = "Failed while deleting ns={} in VCA: {}".format(nsr_id, e)
self.logger.error(msg)
failed_detail.append(msg)
continue
pending_tasks.append(task_delete_kdu_instance)
except LcmException as e:
- msg = "Failed while deleting KDUs from NS: {}".format(e)
+ msg = "Failed while deleting KDUs from ns={}: {}".format(nsr_id, e)
self.logger.error(msg)
failed_detail.append(msg)
item_id_name=RO_nsr_id,
extra_item="action",
extra_item_id=RO_delete_action)
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
ns_status, ns_status_info = self.RO.check_action_status(desc)
if ns_status == "ERROR":
raise ROclient.ROClientException(ns_status_info)
self.logger.error(logging_text + failed_detail[-1])
if failed_detail:
+ terminate_ok = False
self.logger.error(logging_text + " ;".join(failed_detail))
db_nsr_update["operational-status"] = "failed"
db_nsr_update["detailed-status"] = "Deletion errors " + "; ".join(failed_detail)
db_nslcmop_update["operationState"] = nslcmop_operation_state = "FAILED"
db_nslcmop_update["statusEnteredTime"] = time()
else:
+ terminate_ok = True
db_nsr_update["operational-status"] = "terminated"
db_nsr_update["detailed-status"] = "Done"
db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
db_nsr_update["_admin.current-operation"] = None
db_nsr_update["_admin.operation-type"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ if terminate_ok:
+ ns_state = "IDLE"
+ error_description = None
+ error_detail = None
+ else:
+ ns_state = "BROKEN"
+ error_detail = "; ".join(failed_detail)
+ error_description = 'Operation: TERMINATING.{}, step: {}. Detail: {}'\
+ .format(nslcmop_id, step, error_detail)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=ns_state,
+ current_operation="IDLE",
+ current_operation_id=None,
+ error_description=error_description
+ )
+
+ self._write_op_status(
+ op_id=nslcmop_id,
+ error_message=error_description
+ )
+
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
if nslcmop_operation_state:
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="RUNNING ACTION",
+ current_operation_id=nslcmop_id
+ )
+
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
break
elif kdu_name:
self.logger.debug(logging_text + "Checking actions in KDUs")
- kdur = next((x for x in db_vnfr["kdur"] if x["kdu_name"] == kdu_name), None)
+ kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
desc_params = self._format_additional_params(kdur.get("additionalParams")) or {}
if primitive_params:
desc_params.update(primitive_params)
db_nsr_update["_admin.nslcmop"] = None
db_nsr_update["_admin.current-operation"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None
+ )
+ if exc:
+ self._write_op_status(
+ op_id=nslcmop_id,
+ error_message=nslcmop_operation_state_detail
+ )
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
self.logger.debug(logging_text + "Exit")
step = "Waiting for previous operations to terminate"
await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="SCALING",
+ current_operation_id=nslcmop_id
+ )
+
step = "Getting nslcmop from database"
self.logger.debug(step + " after having waited for previous tasks to be completed")
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
if not RO_task_done:
desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
extra_item_id=RO_nslcmop_id)
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
ns_status, ns_status_info = self.RO.check_action_status(desc)
if ns_status == "ERROR":
raise ROclient.ROClientException(ns_status_info)
vnfr_scaled = True
try:
desc = await self.RO.show("ns", RO_nsr_id)
+
+ # deploymentStatus
+ self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
+
# nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
break
# Post-scale reintent check: Check if this sub-operation has been executed before
op_index = self._check_or_add_scale_suboperation(
db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
- if (op_index == self.SUBOPERATION_STATUS_SKIP):
+ if op_index == self.SUBOPERATION_STATUS_SKIP:
# Skip sub-operation
result = 'COMPLETED'
result_detail = 'Done'
"vnf_config_primitive={} Skipped sub-operation, result {} {}".
format(vnf_config_primitive, result, result_detail))
else:
- if (op_index == self.SUBOPERATION_STATUS_NEW):
+ if op_index == self.SUBOPERATION_STATUS_NEW:
# New sub-operation: Get index of this sub-operation
op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
exc = traceback.format_exc()
self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
finally:
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None
+ )
if exc:
if db_nslcmop:
db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
db_nsr_update["_admin.operation-type"] = None
db_nsr_update["_admin.nslcmop"] = None
self.update_db_2("nsrs", nsr_id, db_nsr_update)
+
+ self._write_ns_status(
+ nsr_id=nsr_id,
+ ns_state=None,
+ current_operation="IDLE",
+ current_operation_id=None
+ )
+
except DbException as e:
self.logger.error(logging_text + "Cannot update database: {}".format(e))
if nslcmop_operation_state: