From: quilesj Date: Tue, 8 Oct 2019 11:34:55 +0000 (+0200) Subject: Incorporate new N2VC API to LCM X-Git-Tag: v7.0.0rc1~17^2~2 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=refs%2Fchanges%2F96%2F8096%2F6;p=osm%2FLCM.git Incorporate new N2VC API to LCM Change-Id: I25200384b48948fb59850b8c637e3b1e1f43d312 Signed-off-by: quilesj --- diff --git a/osm_lcm/__init__.py b/osm_lcm/__init__.py index 1c51076..3cf7857 100644 --- a/osm_lcm/__init__.py +++ b/osm_lcm/__init__.py @@ -13,5 +13,5 @@ ## # version moved to lcm.py. uncomment if LCM is installed as library and installed -version = '6.0.2.post2' -version_date = '2018-09-11' +version = '6.0.2.post10' +version_date = '2019-10-08' diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 48c07eb..a1fb152 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -17,6 +17,11 @@ # under the License. ## + +# DEBUG WITH PDB +import os +import pdb + import asyncio import yaml import logging @@ -24,7 +29,11 @@ import logging.handlers import getopt import sys -from osm_lcm import ROclient, ns, vim_sdn, netslice +from osm_lcm import ns +from osm_lcm import vim_sdn +from osm_lcm import netslice +from osm_lcm import ROclient + from time import time, sleep from osm_lcm.lcm_utils import versiontuple, LcmException, TaskRegistry, LcmExceptionExit from osm_lcm import version as lcm_version, version_date as lcm_version_date @@ -38,10 +47,14 @@ from os import environ, path from random import choice as random_choice from n2vc import version as n2vc_version +if os.getenv('OSMLCM_PDB_DEBUG', None) is not None: + pdb.set_trace() + __author__ = "Alfonso Tierno" min_RO_version = "6.0.2" min_n2vc_version = "0.0.2" + min_common_version = "0.1.19" # uncomment if LCM is installed as library and installed, and get them from __init__.py # lcm_version = '0.1.41' @@ -149,6 +162,7 @@ class Lcm: raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format( config["storage"]["driver"])) + # copy message configuration in order to remove 'group_id' for msg_admin config_message = config["message"].copy() config_message["loop"] = self.loop if config_message["driver"] == "local": @@ -530,7 +544,7 @@ class Lcm: def usage(): print("""Usage: {} [options] - -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg) + -c|--config [configuration_file]: loads the configuration file (default: ./lcm.cfg) --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy -h|--help: shows this help """.format(sys.argv[0])) @@ -556,8 +570,14 @@ def health_check(): if __name__ == '__main__': + try: # load parameters and configuration + # -h + # -c value + # --config value + # --help + # --health-check opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"]) # TODO add "log-socket-host=", "log-socket-port=", "log-file=" config_file = None @@ -577,9 +597,10 @@ if __name__ == '__main__': # log_file = a else: assert False, "Unhandled option" + if config_file: if not path.isfile(config_file): - print("configuration file '{}' not exist".format(config_file), file=sys.stderr) + print("configuration file '{}' does not exist".format(config_file), file=sys.stderr) exit(1) else: for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"): diff --git a/osm_lcm/ns.py b/osm_lcm/ns.py index 13c1404..2c603f4 100644 --- a/osm_lcm/ns.py +++ b/osm_lcm/ns.py @@ -28,7 +28,8 @@ from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase from osm_common.dbbase import DbException from osm_common.fsbase import FsException -from n2vc.vnf import N2VC, N2VCPrimitiveExecutionFailed, NetworkServiceDoesNotExist + +from n2vc.n2vc_juju_conn import N2VCJujuConnector from copy import copy, deepcopy from http import HTTPStatus @@ -97,32 +98,64 @@ class NsLcm(LcmBase): :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', :return: None """ - # logging - self.logger = logging.getLogger('lcm.ns') + super().__init__( + db=db, + msg=msg, + fs=fs, + logger=logging.getLogger('lcm.ns') + ) + self.loop = loop self.lcm_tasks = lcm_tasks - - super().__init__(db, msg, fs, self.logger) - self.ro_config = ro_config - - self.n2vc = N2VC( + self.vca_config = vca_config + if 'pubkey' in self.vca_config: + self.vca_config['public_key'] = self.vca_config['pubkey'] + if 'cacert' in self.vca_config: + self.vca_config['ca_cert'] = self.vca_config['cacert'] + + # create N2VC connector + self.n2vc = N2VCJujuConnector( + db=self.db, + fs=self.fs, log=self.logger, - server=vca_config['host'], - port=vca_config['port'], - user=vca_config['user'], - secret=vca_config['secret'], - # TODO: This should point to the base folder where charms are stored, - # if there is a common one (like object storage). Otherwise, leave - # it unset and pass it via DeployCharms - # artifacts=vca_config[''], - artifacts=None, - juju_public_key=vca_config.get('pubkey'), - ca_cert=vca_config.get('cacert'), - api_proxy=vca_config.get('apiproxy') + loop=self.loop, + url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']), + username=self.vca_config.get('user', None), + vca_config=self.vca_config, + on_update_db=self._on_update_n2vc_db + # TODO + # New N2VC argument + # api_proxy=vca_config.get('apiproxy') ) + + # create RO client self.RO = ROclient.ROClient(self.loop, **self.ro_config) + def _on_update_n2vc_db(self, table, filter, path, updated_data): + + self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={})' + .format(table, filter, path, updated_data)) + + # write NS status to database + try: + nsrs_id = filter.get('_id') + # get ns record + nsr = self.db.get_one(table=table, q_filter=filter) + # get VCA deployed list + vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA')) + # get RO deployed + ro_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'RO')) + for vca in vca_list: + status = vca.get('status') + detailed_status = vca.get('detailed-status') + for ro in ro_list: + pass + + except Exception as e: + self.logger.error('_on_update_n2vc_db(table={},filter={},path={},updated_data={}) Error updating db: {}' + .format(table, filter, path, updated_data, e)) + def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None): """ Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd @@ -179,85 +212,6 @@ class NsLcm(LcmBase): raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}". format(vnfd["id"], vdu["id"], e)) - def n2vc_callback(self, model_name, application_name, status, message, n2vc_info, task=None): - """ - Callback both for charm status change and task completion - :param model_name: Charm model name - :param application_name: Charm application name - :param status: Can be - - blocked: The unit needs manual intervention - - maintenance: The unit is actively deploying/configuring - - waiting: The unit is waiting for another charm to be ready - - active: The unit is deployed, configured, and ready - - error: The charm has failed and needs attention. - - terminated: The charm has been destroyed - - removing, - - removed - :param message: detailed message error - :param n2vc_info: dictionary with information shared with instantiate task. It contains: - nsr_id: - nslcmop_id: - lcmOperationType: currently "instantiate" - deployed: dictionary with {: {operational-status: , detailed-status: }} - db_update: dictionary to be filled with the changes to be wrote to database with format key.key.key: value - n2vc_event: event used to notify instantiation task that some change has been produced - :param task: None for charm status change, or task for completion task callback - :return: - """ - try: - nsr_id = n2vc_info["nsr_id"] - deployed = n2vc_info["deployed"] - db_nsr_update = n2vc_info["db_update"] - nslcmop_id = n2vc_info["nslcmop_id"] - ns_operation = n2vc_info["lcmOperationType"] - n2vc_event = n2vc_info["n2vc_event"] - logging_text = "Task ns={} {}={} [n2vc_callback] application={}".format(nsr_id, ns_operation, nslcmop_id, - application_name) - for vca_index, vca_deployed in enumerate(deployed): - if not vca_deployed: - continue - if model_name == vca_deployed["model"] and application_name == vca_deployed["application"]: - break - else: - self.logger.error(logging_text + " Not present at nsr._admin.deployed.VCA. Received model_name={}". - format(model_name)) - return - if task: - if task.cancelled(): - self.logger.debug(logging_text + " task Cancelled") - vca_deployed['operational-status'] = "error" - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error" - vca_deployed['detailed-status'] = "Task Cancelled" - db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = "Task Cancelled" - - elif task.done(): - exc = task.exception() - if exc: - self.logger.error(logging_text + " task Exception={}".format(exc)) - vca_deployed['operational-status'] = "error" - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = "error" - vca_deployed['detailed-status'] = str(exc) - db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(exc) - else: - self.logger.debug(logging_text + " task Done") - # task is Done, but callback is still ongoing. So ignore - return - elif status: - self.logger.debug(logging_text + " Enter status={} message={}".format(status, message)) - if vca_deployed['operational-status'] == status: - return # same status, ignore - vca_deployed['operational-status'] = status - db_nsr_update["_admin.deployed.VCA.{}.operational-status".format(vca_index)] = status - vca_deployed['detailed-status'] = str(message) - db_nsr_update["_admin.deployed.VCA.{}.detailed-status".format(vca_index)] = str(message) - else: - self.logger.critical(logging_text + " Enter with bad parameters", exc_info=True) - return - # wake up instantiate task - n2vc_event.set() - except Exception as e: - self.logger.critical(logging_text + " Exception {}".format(e), exc_info=True) - def ns_params_2_RO(self, ns_params, nsd, vnfd_dict, n2vc_key_list): """ Creates a RO ns descriptor from OSM ns_instantiate params @@ -323,6 +277,7 @@ class NsLcm(LcmBase): "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")), # "scenario": ns_params["nsdId"], } + n2vc_key_list = n2vc_key_list or [] for vnfd_ref, vnfd in vnfd_dict.items(): vdu_needed_access = [] @@ -478,7 +433,7 @@ class NsLcm(LcmBase): for vld_id, instance_scenario_id in vld_params["ns-net"].items(): RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id} if RO_vld_ns_net: - populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net) + populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net) if "vnfd-connection-point-ref" in vld_params: for cp_params in vld_params["vnfd-connection-point-ref"]: # look for interface @@ -623,8 +578,8 @@ class NsLcm(LcmBase): break else: raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} " - "from VIM info".format(vnf_index, vdur["vdu-id-ref"], - ifacer["name"])) + "from VIM info" + .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"])) vnfr_update["vdur.{}".format(vdu_index)] = vdur break else: @@ -651,15 +606,17 @@ class NsLcm(LcmBase): else: raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index)) - async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, - n2vc_key_list): + async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, + db_nslcmop, db_vnfrs, db_vnfds_ref, n2vc_key_list): + db_nsr_update = {} RO_descriptor_number = 0 # number of descriptors created at RO vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO start_deploy = time() ns_params = db_nslcmop.get("operationParams") - + # deploy RO + # get vnfds, instantiate at RO for c_vnf in nsd.get("constituent-vnfd", ()): member_vnf_index = c_vnf["member-vnf-index"] @@ -774,9 +731,7 @@ class NsLcm(LcmBase): RO_ns_params = self.ns_params_2_RO(ns_params, nsd, db_vnfds_ref, n2vc_key_list) step = db_nsr_update["detailed-status"] = "Deploying ns at VIM" - desc = await self.RO.create("ns", descriptor=RO_ns_params, - name=db_nsr["name"], - scenario=RO_nsd_uuid) + desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid) RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"] db_nsr_update["_admin.nsState"] = "INSTANTIATED" db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD" @@ -816,22 +771,33 @@ class NsLcm(LcmBase): self.ns_update_nsr(db_nsr_update, db_nsr, desc) db_nsr_update["operational-status"] = "running" - db_nsr["detailed-status"] = "Configuring vnfr" + db_nsr["detailed-status"] = "Deployed at VIM" + db_nsr_update["detailed-status"] = "Deployed at VIM" self.update_db_2("nsrs", nsr_id, db_nsr_update) - - async def insert_key_ro(self, logging_text, pub_key, nsr_id, vnfr_id, vdu_id, vdu_index, user): + + step = "Deployed at VIM" + self.logger.debug(logging_text + step) + + # wait for ip addres at RO, and optionally, insert public key in virtual machine + # returns IP address + async def insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None): + ro_nsr_id = None ip_address = None nb_tries = 0 target_vdu_id = None + while True: + await asyncio.sleep(10, loop=self.loop) - # 1 wait until NS is deployed at RO + # wait until NS is deployed at RO if not ro_nsr_id: db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id}) ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id")) if not ro_nsr_id: continue + + # get ip address if not target_vdu_id: db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id}) if not vdu_id: @@ -850,35 +816,54 @@ class NsLcm(LcmBase): raise LcmException("Not found vnfr_id={}, vdu_index={}, vdu_index={}".format( vnfr_id, vdu_id, vdu_index )) + if not target_vdu_id: continue - try: - ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index - result_dict = await self.RO.create_action("ns", ro_nsr_id, - {"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}) - # result_dict contains the format {VM-id: {vim_result: 200, description: text}} - if not result_dict or not isinstance(result_dict, dict): - raise LcmException("Unknown response from RO when injecting key") - for result in result_dict.values(): - if result.get("vim_result") == 200: - break - else: - raise ROclient.ROClientException("error injecting key: {}".format( - result.get("description"))) - return - except ROclient.ROClientException as e: - nb_tries += 1 - if nb_tries >= 10: - raise LcmException("Reaching max tries injecting key. Error: {}".format(e)) - self.logger.debug(logging_text + "error injecting key: {}".format(e)) - async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, vdu_index, - deploy_params, config_descriptor, base_folder): + # inject public key into machine + if pub_key and user: + try: + ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index + result_dict = await self.RO.create_action( + item="ns", + item_id_name=ro_nsr_id, + descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user} + ) + # result_dict contains the format {VM-id: {vim_result: 200, description: text}} + if not result_dict or not isinstance(result_dict, dict): + raise LcmException("Unknown response from RO when injecting key") + for result in result_dict.values(): + if result.get("vim_result") == 200: + break + else: + raise ROclient.ROClientException("error injecting key: {}".format( + result.get("description"))) + break + except ROclient.ROClientException as e: + nb_tries += 1 + if nb_tries >= 10: + raise LcmException("Reaching max tries injecting key. Error: {}".format(e)) + self.logger.debug(logging_text + "error injecting key: {}".format(e)) + else: + # no user or pub_key ---> break while true + break + + return ip_address + + async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, + vdu_index, config_descriptor, deploy_params, base_folder): nsr_id = db_nsr["_id"] db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index) vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index] - logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} ".format(db_vnfr["member-vnf-index-ref"], - vdu_id, vdu_index) + db_dict = { + 'collection': 'nsrs', + 'filter': {'_id': nsr_id}, + 'path': db_update_entry + } + + logging_text += "member_vnf_index={} vdu_id={}, vdu_index={} "\ + .format(db_vnfr["member-vnf-index-ref"], vdu_id, vdu_index) + step = "" try: vnfr_id = None @@ -894,138 +879,223 @@ class NsLcm(LcmBase): namespace += ".{}-{}".format(vdu_id, vdu_index or 0) # Get artifact path - storage_params = self.fs.get_params() - artifact_path = "{}{}/{}/charms/{}".format( - storage_params["path"], + artifact_path = "/{}/{}/charms/{}".format( base_folder["folder"], base_folder["pkg-dir"], config_descriptor["juju"]["charm"] ) - is_proxy_charm = True if config_descriptor["juju"]["charm"] else False - if config_descriptor["juju"].get("proxy") is False: + is_proxy_charm = deep_get(config_descriptor, ('juju', 'charm')) is not None + if deep_get(config_descriptor, ('juju', 'proxy')) is False: is_proxy_charm = False # n2vc_redesign STEP 3.1 + + # find old ee_id if exists ee_id = vca_deployed.get("ee_id") - if not ee_id: - if is_proxy_charm: - step = "create execution envioronment" - self.logger.debug(logging_text + step) - ee_id = await self.n2vc.CreateExecutionEnvironment(namespace) - else: - step = "register execution envioronment" - # TODO wait until deployed by RO, when IP address has been filled. By pooling???? - credentials = {} # TODO db_credentials["ip_address"] - # get username - # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were - # merged. Meanwhile let's get username from initial-config-primitive - if config_descriptor.get("initial-config-primitive"): - for param in config_descriptor["initial-config-primitive"][0].get("parameter", ()): - if param["name"] == "ssh-username": - credentials["username"] = param["value"] - if config_descriptor.get("config-access") and config_descriptor["config-access"].get("ssh-access"): - if config_descriptor["config-access"]["ssh-access"].get("required"): - credentials["username"] = \ - config_descriptor["config-access"]["ssh-access"].get("default-user") - - # n2vc_redesign STEP 3.2 - self.logger.debug(logging_text + step) - ee_id = await self.n2vc.RegisterExecutionEnvioronment(credentials) - # for compatibility with MON/POL modules, the need model and application name at database - # TODO ask to N2VC instead of assuming the format "model_name.application_name" - model_name, _, application_name = ee_id.partition(".") - self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name, - db_update_entry + "application": application_name, - db_update_entry + "ee_id": ee_id}) + # create or register execution environment in VCA + if is_proxy_charm: + step = "create execution environment" + self.logger.debug(logging_text + step) + ee_id, credentials = await self.n2vc.create_execution_environment( + namespace=namespace, + reuse_ee_id=ee_id, + db_dict=db_dict + ) + + else: + step = "register execution envioronment" + # TODO wait until deployed by RO, when IP address has been filled. By pooling???? + credentials = {} # TODO db_credentials["ip_address"] + # get username + # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were + # merged. Meanwhile let's get username from initial-config-primitive + if config_descriptor.get("initial-config-primitive"): + for param in config_descriptor["initial-config-primitive"][0].get("parameter", ()): + if param["name"] == "ssh-username": + credentials["username"] = param["value"] + if config_descriptor.get("config-access") and config_descriptor["config-access"].get("ssh-access"): + if config_descriptor["config-access"]["ssh-access"].get("required"): + credentials["username"] = \ + config_descriptor["config-access"]["ssh-access"].get("default-user") + + # n2vc_redesign STEP 3.2 + self.logger.debug(logging_text + step) + ee_id = await self.n2vc.register_execution_environment( + credentials=credentials, + namespace=namespace, + db_dict=db_dict + ) + + # for compatibility with MON/POL modules, the need model and application name at database + # TODO ask to N2VC instead of assuming the format "model_name.application_name" + ee_id_parts = ee_id.split('.') + model_name = ee_id_parts[0] + application_name = ee_id_parts[1] + self.update_db_2("nsrs", nsr_id, {db_update_entry + "model": model_name, + db_update_entry + "application": application_name, + db_update_entry + "ee_id": ee_id}) # n2vc_redesign STEP 3.3 # TODO check if already done step = "Install configuration Software" - # TODO db_dict with filter: 'nsr_id', path:'db_update_entry' - pub_key = await self.n2vc.InstallConfigurationSW(self, ee_id, artifact_path) - if is_proxy_charm and pub_key: - # check if ssh key must be injected - required = deep_get(config_descriptor, ("config-access", "ssh-access", "required")) - # TODO get from config primitive: if required not user: - if required: - user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) - # insert pub_key into VM - # n2vc_redesign STEP 5.1 - step = "Insert public key into VM" - await self.insert_key_ro(logging_text, pub_key, nsr_id, vnfr_id, vdu_id, vdu_index, user) - # TODO get ip_address and add to deploy params - deploy_params["rw_mgmt_ip"] = "TODO-ip-address" + self.logger.debug(logging_text + step) + await self.n2vc.install_configuration_sw( + ee_id=ee_id, + artifact_path=artifact_path, + db_dict=db_dict + ) + + # if SSH access is required, then get execution environment SSH public + required = deep_get(config_descriptor, ("config-access", "ssh-access", "required")) + if is_proxy_charm and required: + + pub_key = None + pub_key = await self.n2vc.get_ee_ssh_public__key( + ee_id=ee_id, + db_dict=db_dict + ) + + user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user")) + # insert pub_key into VM + # n2vc_redesign STEP 5.1 + step = "Insert public key into VM" + self.logger.debug(logging_text + step) + + # wait for RO (ip-address) + rw_mgmt_ip = await self.insert_key_ro( + logging_text=logging_text, + nsr_id=nsr_id, + vnfr_id=vnfr_id, + vdu_id=vdu_id, + vdu_index=vdu_index, + user=user, + pub_key=pub_key + ) + + # store rw_mgmt_ip in deploy params for later substitution + self.logger.debug('rw_mgmt_ip={}'.format(rw_mgmt_ip)) + deploy_params["rw_mgmt_ip"] = rw_mgmt_ip # n2vc_redesign STEP 6 Execute initial config primitive initial_config_primitive_list = config_descriptor.get('initial-config-primitive', []) + step = 'execute initial config primitive' + + # sort initial config primitives by 'seq' + try: + initial_config_primitive_list.sort(key=lambda val: int(val['seq'])) + except Exception: + self.logger.warn(logging_text + 'Cannot sort by "seq" field' + step) + for initial_config_primitive in initial_config_primitive_list: # TODO check if already done primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params) step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_) self.logger.debug(logging_text + step) - await self.n2vc.ExecutePrimitive(ee_id, initial_config_primitive["name"], primitive_params_) + await self.n2vc.exec_primitive( + ee_id=ee_id, + primitive_name=initial_config_primitive["name"], + params_dict=primitive_params_, + db_dict=db_dict + ) # TODO register in database that primitive is done + + step = "instantiated at VCA" + self.logger.debug(logging_text + step) + except Exception as e: # TODO not use Exception but N2VC exception - raise Exception("{} {}".format(step, e)) + raise Exception("{} {}".format(step, e)) from e # TODO raise N2VC exception with 'step' extra information async def instantiate(self, nsr_id, nslcmop_id): + """ + + :param nsr_id: ns instance to deploy + :param nslcmop_id: operation to run + :return: + """ + # Try to lock HA task here task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id) if not task_is_locked_by_me: + self.logger.debug('instantiate() task is not locked by me') return logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id) self.logger.debug(logging_text + "Enter") + # get all needed from database - # start_deploy = time() + + # database nsrs record db_nsr = None + + # database nslcmops record db_nslcmop = None + + # update operation on nsrs db_nsr_update = {"_admin.nslcmop": nslcmop_id} + + # update operation on nslcmops db_nslcmop_update = {} + nslcmop_operation_state = None - db_vnfrs = {} + db_vnfrs = {} # vnf's info indexed by member-index + # n2vc_info = {} + # n2vc_key_list = [] # list of public keys to be injected as authorized to VMs task_instantiation_list = [] exc = None try: # wait for any previous tasks in process step = "Waiting for previous tasks" await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id) - # STEP 0: Reding database + + # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds) + + # read from db: operation step = "Getting nslcmop={} from db".format(nslcmop_id) db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) + + # read from db: ns step = "Getting nsr={} from db".format(nsr_id) db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + # nsd is replicated into ns (no db read) nsd = db_nsr["nsd"] # nsr_name = db_nsr["name"] # TODO short-name?? + # read from db: vnf's of this ns step = "Getting vnfrs from db" db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}) - db_vnfds_ref = {} - db_vnfds = {} - db_vnfds_index = {} + + # read from db: vnfd's for every vnf + db_vnfds_ref = {} # every vnfd data indexed by vnf name + db_vnfds = {} # every vnfd data indexed by vnf id + db_vnfds_index = {} # every vnfd data indexed by vnf member-index + + # for each vnf in ns, read vnfd for vnfr in db_vnfrs_list: - db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr - vnfd_id = vnfr["vnfd-id"] - vnfd_ref = vnfr["vnfd-ref"] + db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc + vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf + vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf + # if we haven't this vnfd, read it from db if vnfd_id not in db_vnfds: + # read from cb step = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref) vnfd = self.db.get_one("vnfds", {"_id": vnfd_id}) - db_vnfds_ref[vnfd_ref] = vnfd - db_vnfds[vnfd_id] = vnfd - db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] - # Get or generates the _admin.deployed,VCA list + # store vnfd + db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name + db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id + db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index + + # Get or generates the _admin.deployed.VCA list vca_deployed_list = None - # vca_model_name = None if db_nsr["_admin"].get("deployed"): vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA") - # vca_model_name = db_nsr["_admin"]["deployed"].get("VCA-model-name") if vca_deployed_list is None: vca_deployed_list = [] db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list + # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list) elif isinstance(vca_deployed_list, dict): # maintain backward compatibility. Change a dict to list at database @@ -1035,6 +1105,7 @@ class NsLcm(LcmBase): db_nsr_update["detailed-status"] = "creating" db_nsr_update["operational-status"] = "init" + if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list): populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), []) db_nsr_update["_admin.deployed.RO.vnfd"] = [] @@ -1045,79 +1116,39 @@ class NsLcm(LcmBase): # n2vc_redesign STEP 1 Get VCA public ssh-key # feature 1429. Add n2vc public key to needed VMs - n2vc_key = await self.n2vc.GetPublicKey() + n2vc_key = await self.n2vc.get_public_key() # n2vc_redesign STEP 2 Deploy Network Scenario task_ro = asyncio.ensure_future( - self.instantiate_RO(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref, [n2vc_key]) + self.instantiate_RO( + logging_text=logging_text, + nsr_id=nsr_id, + nsd=nsd, + db_nsr=db_nsr, + db_nslcmop=db_nslcmop, + db_vnfrs=db_vnfrs, + db_vnfds_ref=db_vnfds_ref, + n2vc_key_list=[n2vc_key] + ) ) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro) task_instantiation_list.append(task_ro) - + # n2vc_redesign STEP 3 to 6 Deploy N2VC step = "Looking for needed vnfd to configure with proxy charm" self.logger.debug(logging_text + step) - def _deploy_n2vc(): - # launch instantiate_N2VC in a asyncio task and register task object - # Look where information of this charm is at database ._admin.deployed.VCA - # if not found, create one entry and update database - # read variables logging_text, nsi_id, db_nsr, db_vnfr, vdu_id, vdu_index, deploy_params, - # descriptor_config, base_folder, member_vnf_index, vnfd_id - # write variables: db_nsr - - # fill db_nsr._admin.deployed.VCA. - vca_index = -1 - for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]): - if not vca_deployed: - continue - if vca_deployed.get("member-vnf-index") == member_vnf_index and \ - vca_deployed.get("vdu_id") == vdu_id and \ - vca_deployed.get("vdu_count_index", 0) == vdu_index: - break - else: - # not found, create one. - vca_deployed = { - "member-vnf-index": member_vnf_index, - "vdu_id": vdu_id, - "vdu_count_index": vdu_index, - "operational-status": "init", # TODO revise - "detailed-status": "", # TODO revise - "step": "initial-deploy", # TODO revise - "vnfd_id": vnfd_id, - "vdu_name": vdu_name, - } - vca_index += 1 - self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed}) - db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) - - # Launch task - task_n2vc = asyncio.ensure_future( - self.instantiate_N2VC(logging_text, - vca_index, - nsi_id, - db_nsr, - db_vnfr, - vdu_id, - vdu_index, - deploy_params, - descriptor_config, - base_folder, - ) - ) - self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) - task_instantiation_list.append(task_n2vc) - nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI + # get_iterable() returns a value from a dict or empty tuple if key does not exist for c_vnf in get_iterable(nsd, "constituent-vnfd"): vnfd_id = c_vnf["vnfd-id-ref"] + vnfd = db_vnfds_ref[vnfd_id] member_vnf_index = str(c_vnf["member-vnf-index"]) db_vnfr = db_vnfrs[member_vnf_index] base_folder = vnfd["_admin"]["storage"] vdu_id = None vdu_index = 0 vdu_name = None - vnfd = db_vnfds_ref[vnfd_id] # Get additional parameters deploy_params = {} @@ -1129,7 +1160,23 @@ class NsLcm(LcmBase): descriptor_config = vnfd.get("vnf-configuration") if descriptor_config and descriptor_config.get("juju"): - _deploy_n2vc() + self._deploy_n2vc( + logging_text=logging_text, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_list=task_instantiation_list + ) # Deploy charms for each VDU that supports one. for vdud in get_iterable(vnfd, 'vdu'): @@ -1147,7 +1194,23 @@ class NsLcm(LcmBase): vdu_name = None for vdu_index in range(int(vdud.get("count", 1))): # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"] - _deploy_n2vc() + self._deploy_n2vc( + logging_text=logging_text, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_list=task_instantiation_list + ) # Check if this NS has a charm configuration descriptor_config = nsd.get("ns-configuration") @@ -1166,16 +1229,34 @@ class NsLcm(LcmBase): if isinstance(v, str) and v.startswith("!!yaml "): deploy_params[k] = yaml.safe_load(v[7:]) base_folder = nsd["_admin"]["storage"] - _deploy_n2vc() + self._deploy_n2vc( + logging_text=logging_text, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + nslcmop_id=nslcmop_id, + nsr_id=nsr_id, + nsi_id=nsi_id, + vnfd_id=vnfd_id, + vdu_id=vdu_id, + member_vnf_index=member_vnf_index, + vdu_index=vdu_index, + vdu_name=vdu_name, + deploy_params=deploy_params, + descriptor_config=descriptor_config, + base_folder=base_folder, + task_instantiation_list=task_instantiation_list + ) # Wait until all tasks of "task_instantiation_list" have been finished # while time() <= start_deploy + self.total_deploy_timeout: error_text = None timeout = 3600 # time() - start_deploy - task_instantiation_set = set(task_instantiation_list) - done, pending = await asyncio.wait(task_instantiation_set, timeout=timeout) - # TODO return_when=asyncio.FIRST_COMPLETED) + task_instantiation_set = set(task_instantiation_list) # build a set with tasks + done = None + pending = None + if len(task_instantiation_set) > 0: + done, pending = await asyncio.wait(task_instantiation_set, timeout=timeout) if pending: error_text = "timeout" for task in done: @@ -1242,33 +1323,55 @@ class NsLcm(LcmBase): self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate") - async def _destroy_charm(self, model, application): - """ - Order N2VC destroy a charm - :param model: - :param application: - :return: True if charm does not exist. False if it exist - """ - if not await self.n2vc.HasApplication(model, application): - return True # Already removed - await self.n2vc.RemoveCharms(model, application) - return False - - async def _wait_charm_destroyed(self, model, application, timeout): - """ - Wait until charm does not exist - :param model: - :param application: - :param timeout: - :return: True if not exist, False if timeout - """ - while True: - if not await self.n2vc.HasApplication(model, application): - return True - if timeout < 0: - return False - await asyncio.sleep(10, loop=self.loop) - timeout -= 10 + def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id, + member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config, + base_folder, task_instantiation_list): + # launch instantiate_N2VC in a asyncio task and register task object + # Look where information of this charm is at database ._admin.deployed.VCA + # if not found, create one entry and update database + + # fill db_nsr._admin.deployed.VCA. + vca_index = -1 + for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]): + if not vca_deployed: + continue + if vca_deployed.get("member-vnf-index") == member_vnf_index and \ + vca_deployed.get("vdu_id") == vdu_id and \ + vca_deployed.get("vdu_count_index", 0) == vdu_index: + break + else: + # not found, create one. + vca_deployed = { + "member-vnf-index": member_vnf_index, + "vdu_id": vdu_id, + "vdu_count_index": vdu_index, + "operational-status": "init", # TODO revise + "detailed-status": "", # TODO revise + "step": "initial-deploy", # TODO revise + "vnfd_id": vnfd_id, + "vdu_name": vdu_name, + } + vca_index += 1 + self.update_db_2("nsrs", nsr_id, {"_admin.deployed.VCA.{}".format(vca_index): vca_deployed}) + db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed) + + # Launch task + task_n2vc = asyncio.ensure_future( + self.instantiate_N2VC( + logging_text=logging_text, + vca_index=vca_index, + nsi_id=nsi_id, + db_nsr=db_nsr, + db_vnfr=db_vnfr, + vdu_id=vdu_id, + vdu_index=vdu_index, + deploy_params=deploy_params, + config_descriptor=descriptor_config, + base_folder=base_folder, + ) + ) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc) + task_instantiation_list.append(task_n2vc) # Check if this VNFD has a configured terminate action def _has_terminate_config_primitive(self, vnfd): @@ -1382,9 +1485,8 @@ class NsLcm(LcmBase): # 'detailed-status' : status message # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE' # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations. - def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, - vdu_name, primitive, mapped_primitive_params, - operationState=None, detailed_status=None, operationType=None, + def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive, + mapped_primitive_params, operationState=None, detailed_status=None, operationType=None, RO_nsr_id=None, RO_scaling_info=None): if not (db_nslcmop): return self.SUBOPERATION_STATUS_NOT_FOUND @@ -1427,9 +1529,8 @@ class NsLcm(LcmBase): # a. New: First time execution, return SUBOPERATION_STATUS_NEW # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP # c. Reintent: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute - def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, - vnf_config_primitive, primitive_params, operationType, - RO_nsr_id=None, RO_scaling_info=None): + def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params, + operationType, RO_nsr_id=None, RO_scaling_info=None): # Find this sub-operation if (RO_nsr_id and RO_scaling_info): operationType = 'SCALE-RO' @@ -1527,21 +1628,33 @@ class NsLcm(LcmBase): primitive, mapped_primitive_params) # Sub-operations: Call _ns_execute_primitive() instead of action() - db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) - nsr_deployed = db_nsr["_admin"]["deployed"] - result, result_detail = await self._ns_execute_primitive( - nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive, - mapped_primitive_params) + # db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + # nsr_deployed = db_nsr["_admin"]["deployed"] # nslcmop_operation_state, nslcmop_operation_state_detail = await self.action( # nsr_id, nslcmop_terminate_action_id) # Launch Exception if action() returns other than ['COMPLETED', 'PARTIALLY_COMPLETED'] - result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] - if result not in result_ok: + # result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED'] + # if result not in result_ok: + # raise LcmException( + # "terminate_primitive_action for vnf_member_index={}", + # " primitive={} fails with error {}".format( + # vnf_index, seq.get("name"), result_detail)) + + # TODO: find ee_id + ee_id = None + try: + await self.n2vc.exec_primitive( + ee_id=ee_id, + primitive_name=primitive, + params_dict=mapped_primitive_params + ) + except Exception as e: + self.logger.error('Error executing primitive {}: {}'.format(primitive, e)) raise LcmException( - "terminate_primitive_action for vnf_member_index={}", - " primitive={} fails with error {}".format( - vnf_index, seq.get("name"), result_detail)) + "terminate_primitive_action for vnf_member_index={}, primitive={} fails with error {}" + .format(vnf_index, seq.get("name"), e), + ) async def terminate(self, nsr_id, nslcmop_id): @@ -1556,7 +1669,6 @@ class NsLcm(LcmBase): db_nslcmop = None exc = None failed_detail = [] # annotates all failed error messages - vca_time_destroy = None # time of where destroy charm order db_nsr_update = {"_admin.nslcmop": nslcmop_id} db_nslcmop_update = {} nslcmop_operation_state = None @@ -1578,44 +1690,22 @@ class NsLcm(LcmBase): # Call internal terminate action await self._terminate_action(db_nslcmop, nslcmop_id, nsr_id) + pending_tasks = [] + db_nsr_update["operational-status"] = "terminating" db_nsr_update["config-status"] = "terminating" - if nsr_deployed and nsr_deployed.get("VCA-model-name"): - vca_model_name = nsr_deployed["VCA-model-name"] - step = "deleting VCA model name '{}' and all charms".format(vca_model_name) + # remove NS + try: + step = "delete execution environment" self.logger.debug(logging_text + step) - try: - await self.n2vc.DestroyNetworkService(vca_model_name) - except NetworkServiceDoesNotExist: - pass - db_nsr_update["_admin.deployed.VCA-model-name"] = None - if nsr_deployed.get("VCA"): - for vca_index in range(0, len(nsr_deployed["VCA"])): - db_nsr_update["_admin.deployed.VCA.{}".format(vca_index)] = None - self.update_db_2("nsrs", nsr_id, db_nsr_update) - # for backward compatibility if charm have been created with "default" model name delete one by one - elif nsr_deployed and nsr_deployed.get("VCA"): - try: - step = "Scheduling configuration charms removing" - db_nsr_update["detailed-status"] = "Deleting charms" - self.logger.debug(logging_text + step) - self.update_db_2("nsrs", nsr_id, db_nsr_update) - # for backward compatibility - if isinstance(nsr_deployed["VCA"], dict): - nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values()) - db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"] - self.update_db_2("nsrs", nsr_id, db_nsr_update) - - for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]): - if vca_deployed: - if await self._destroy_charm(vca_deployed['model'], vca_deployed["application"]): - vca_deployed.clear() - db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None - else: - vca_time_destroy = time() - except Exception as e: - self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e)) + + task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id)) + pending_tasks.append(task_delete_ee) + except Exception as e: + msg = "Failed while deleting NS in VCA: {}".format(e) + self.logger.error(msg) + failed_detail.append(msg) # remove from RO RO_fail = False @@ -1646,8 +1736,11 @@ class NsLcm(LcmBase): delete_timeout = 20 * 60 # 20 minutes while delete_timeout > 0: - desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action", - extra_item_id=RO_delete_action) + desc = await self.RO.show( + "ns", + item_id_name=RO_nsr_id, + extra_item="action", + extra_item_id=RO_delete_action) ns_status, ns_status_info = self.RO.check_action_status(desc) if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) @@ -1729,24 +1822,6 @@ class NsLcm(LcmBase): failed_detail.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e)) self.logger.error(logging_text + failed_detail[-1]) - # wait until charm deleted - if vca_time_destroy: - db_nsr_update["detailed-status"] = db_nslcmop_update["detailed-status"] = step = \ - "Waiting for deletion of configuration charms" - self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update) - self.update_db_2("nsrs", nsr_id, db_nsr_update) - for vca_index, vca_deployed in enumerate(nsr_deployed["VCA"]): - if not vca_deployed: - continue - step = "Waiting for deletion of charm application_name={}".format(vca_deployed["application"]) - timeout = self.timeout_charm_delete - int(time() - vca_time_destroy) - if not await self._wait_charm_destroyed(vca_deployed['model'], vca_deployed["application"], - timeout): - failed_detail.append("VCA[application_name={}] Deletion timeout".format( - vca_deployed["application"])) - else: - db_nsr["_admin.deployed.VCA.{}".format(vca_index)] = None - if failed_detail: self.logger.error(logging_text + " ;".join(failed_detail)) db_nsr_update["operational-status"] = "failed" @@ -1794,6 +1869,18 @@ class NsLcm(LcmBase): loop=self.loop) except Exception as e: self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) + + # wait for pending tasks + done = None + pending = None + if pending_tasks: + self.logger.debug(logging_text + 'Waiting for terminate pending tasks...') + done, pending = await asyncio.wait(pending_tasks, timeout=3600) + if not pending: + self.logger.debug(logging_text + 'All tasks finished...') + else: + self.logger.info(logging_text + 'There are pending tasks: {}'.format(pending)) + self.logger.debug(logging_text + "Exit") self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate") @@ -1836,8 +1923,9 @@ class NsLcm(LcmBase): return calculated_params async def _ns_execute_primitive(self, db_deployed, member_vnf_index, vdu_id, vdu_name, vdu_count_index, - primitive, primitive_params, retries=0, retries_interval=30): - start_primitive_time = time() + primitive, primitive_params, retries=0, retries_interval=30) -> str, str: + + # find vca_deployed record for this action try: for vca_deployed in db_deployed["VCA"]: if not vca_deployed: @@ -1850,55 +1938,42 @@ class NsLcm(LcmBase): continue break else: + # vca_deployed not found raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} is not " "deployed".format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) - model_name = vca_deployed.get("model") - application_name = vca_deployed.get("application") - if not model_name or not application_name: + + # get ee_id + ee_id = vca_deployed.get("ee_id") + if not ee_id: raise LcmException("charm for member_vnf_index={} vdu_id={} vdu_name={} vdu_count_index={} has not " - "model or application name" .format(member_vnf_index, vdu_id, vdu_name, - vdu_count_index)) - # if vca_deployed["operational-status"] != "active": - # raise LcmException("charm for member_vnf_index={} vdu_id={} operational_status={} not 'active'".format( - # member_vnf_index, vdu_id, vca_deployed["operational-status"])) - callback = None # self.n2vc_callback - callback_args = () # [db_nsr, db_nslcmop, member_vnf_index, None] - await self.n2vc.login() + "execution environment" + .format(member_vnf_index, vdu_id, vdu_name, vdu_count_index)) + if primitive == "config": primitive_params = {"params": primitive_params} + while retries >= 0: - primitive_id = await self.n2vc.ExecutePrimitive( - model_name, - application_name, - primitive, - callback, - *callback_args, - **primitive_params - ) - while time() - start_primitive_time < self.timeout_primitive: - primitive_result_ = await self.n2vc.GetPrimitiveStatus(model_name, primitive_id) - if primitive_result_ in ("completed", "failed"): - primitive_result = "COMPLETED" if primitive_result_ == "completed" else "FAILED" - detailed_result = await self.n2vc.GetPrimitiveOutput(model_name, primitive_id) - break - elif primitive_result_ is None and primitive == "config": - primitive_result = "COMPLETED" - detailed_result = None - break - else: # ("running", "pending", None): - pass - await asyncio.sleep(5, loop=self.loop) - else: - raise LcmException("timeout after {} seconds".format(self.timeout_primitive)) - if primitive_result == "COMPLETED": + try: + output = await self.n2vc.exec_primitive( + ee_id=ee_id, + primitive_name=primitive, + params_dict=primitive_params + ) + # execution was OK break - retries -= 1 - if retries >= 0: - await asyncio.sleep(retries_interval, loop=self.loop) + except Exception as e: + self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e)) + retries -= 1 + if retries >= 0: + # wait and retry + await asyncio.sleep(retries_interval, loop=self.loop) + else: + return 'Cannot execute action {} on {}: {}'.format(primitive, ee_id, e), 'FAIL' + + return output, 'OK' - return primitive_result, detailed_result - except (N2VCPrimitiveExecutionFailed, LcmException) as e: - return "FAILED", str(e) + except Exception as e: + return 'Error executing action {}: {}'.format(primitive, e), 'FAIL' async def action(self, nsr_id, nslcmop_id): @@ -1985,13 +2060,25 @@ class NsLcm(LcmBase): desc_params.update(db_nsr["additionalParamsForNs"]) # TODO check if ns is in a proper status - result, result_detail = await self._ns_execute_primitive( - nsr_deployed, vnf_index, vdu_id, vdu_name, vdu_count_index, primitive, - self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)) - db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = result_detail + output, detail = await self._ns_execute_primitive( + db_deployed=nsr_deployed, + member_vnf_index=vnf_index, + vdu_id=vdu_id, + vdu_name=vdu_name, + vdu_count_index=vdu_count_index, + primitive=primitive, + primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)) + + detailed_status = output + if detail == 'OK': + result = 'COMPLETED' + else: + result = 'FAILED' + + db_nslcmop_update["detailed-status"] = nslcmop_operation_state_detail = detailed_status db_nslcmop_update["operationState"] = nslcmop_operation_state = result db_nslcmop_update["statusEnteredTime"] = time() - self.logger.debug(logging_text + " task Done with result {} {}".format(result, result_detail)) + self.logger.debug(logging_text + " task Done with result {} {}".format(result, detailed_status)) return # database update is called inside finally except (DbException, LcmException) as e: @@ -2190,6 +2277,7 @@ class NsLcm(LcmBase): vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info} if db_vnfr.get("additionalParamsForVnf"): vnfr_params.update(db_vnfr["additionalParamsForVnf"]) + scale_process = "VCA" db_nsr_update["config-status"] = "configuring pre-scaling" primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params) @@ -2289,6 +2377,7 @@ class NsLcm(LcmBase): else: assert False, "ROclient.check_action_status returns unknown {}".format(ns_status) else: + if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) elif ns_status == "BUILD":