| ## |
| # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U. |
| # This file is part of OSM |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| # implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact with: nfvlabs@tid.es |
| ## |
| |
| |
| import abc |
| import asyncio |
| from http import HTTPStatus |
| import os |
| import shlex |
| import subprocess |
| import time |
| |
| from n2vc.exceptions import N2VCBadArgumentsException |
| from osm_common.dbmongo import DbException |
| import yaml |
| |
| from n2vc.loggable import Loggable |
| from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus |
| |
| |
| class N2VCConnector(abc.ABC, Loggable): |
| """Generic N2VC connector |
| |
| Abstract class |
| """ |
| |
| """ |
| #################################################################################### |
| ################################### P U B L I C #################################### |
| #################################################################################### |
| """ |
| |
| def __init__( |
| self, |
| db: object, |
| fs: object, |
| log: object, |
| on_update_db=None, |
| **kwargs, |
| ): |
| """Initialize N2VC abstract connector. It defines de API for VCA connectors |
| |
| :param object db: Mongo object managing the MongoDB (repo common DbBase) |
| :param object fs: FileSystem object managing the package artifacts (repo common |
| FsBase) |
| :param object log: the logging object to log to |
| :param on_update_db: callback called when n2vc connector updates database. |
| Received arguments: |
| table: e.g. "nsrs" |
| filter: e.g. {_id: <nsd-id> } |
| path: e.g. "_admin.deployed.VCA.3." |
| updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }" |
| """ |
| |
| # parent class |
| Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC") |
| |
| # check arguments |
| if db is None: |
| raise N2VCBadArgumentsException("Argument db is mandatory", ["db"]) |
| if fs is None: |
| raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"]) |
| |
| # store arguments into self |
| self.db = db |
| self.fs = fs |
| self.on_update_db = on_update_db |
| |
| # generate private/public key-pair |
| self.private_key_path = None |
| self.public_key_path = None |
| |
| @abc.abstractmethod |
| async def get_status(self, namespace: str, yaml_format: bool = True): |
| """Get namespace status |
| |
| :param namespace: we obtain ns from namespace |
| :param yaml_format: returns a yaml string |
| """ |
| |
| # TODO: review which public key |
| def get_public_key(self) -> str: |
| """Get the VCA ssh-public-key |
| |
| Returns the SSH public key from local mahine, to be injected into virtual |
| machines to be managed by the VCA. |
| First run, a ssh keypair will be created. |
| The public key is injected into a VM so that we can provision the |
| machine with Juju, after which Juju will communicate with the VM |
| directly via the juju agent. |
| """ |
| |
| # Find the path where we expect our key lives (~/.ssh) |
| homedir = os.environ.get("HOME") |
| if not homedir: |
| self.log.warning("No HOME environment variable, using /tmp") |
| homedir = "/tmp" |
| sshdir = "{}/.ssh".format(homedir) |
| sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir))) |
| if not os.path.exists(sshdir): |
| os.mkdir(sshdir) |
| |
| self.private_key_path = "{}/id_n2vc_rsa".format(sshdir) |
| self.private_key_path = os.path.realpath( |
| os.path.normpath(os.path.abspath(self.private_key_path)) |
| ) |
| self.public_key_path = "{}.pub".format(self.private_key_path) |
| self.public_key_path = os.path.realpath( |
| os.path.normpath(os.path.abspath(self.public_key_path)) |
| ) |
| |
| # If we don't have a key generated, then we have to generate it using ssh-keygen |
| if not os.path.exists(self.private_key_path): |
| command = "ssh-keygen -t {} -b {} -N '' -f {}".format( |
| "rsa", "4096", self.private_key_path |
| ) |
| # run command with arguments |
| args = shlex.split(command) |
| subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| |
| # Read the public key. Only one public key (one line) in the file |
| with open(self.public_key_path, "r") as file: |
| public_key = file.readline() |
| |
| return public_key |
| |
| @abc.abstractmethod |
| async def create_execution_environment( |
| self, |
| namespace: str, |
| db_dict: dict, |
| reuse_ee_id: str = None, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| ) -> (str, dict): |
| """Create an Execution Environment. Returns when it is created or raises an |
| exception on failing |
| |
| :param str namespace: Contains a dot separate string. |
| LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>] |
| :param dict db_dict: where to write to database when the status changes. |
| It contains a dictionary with {collection: str, filter: {}, path: str}, |
| e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: |
| "_admin.deployed.VCA.3"} |
| :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an |
| older environment |
| :param float progress_timeout: |
| :param float total_timeout: |
| :returns str, dict: id of the new execution environment and credentials for it |
| (credentials can contains hostname, username, etc depending on |
| underlying cloud) |
| """ |
| |
| @abc.abstractmethod |
| async def register_execution_environment( |
| self, |
| namespace: str, |
| credentials: dict, |
| db_dict: dict, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| ) -> str: |
| """ |
| Register an existing execution environment at the VCA |
| |
| :param str namespace: same as create_execution_environment method |
| :param dict credentials: credentials to access the existing execution |
| environment |
| (it can contains hostname, username, path to private key, etc depending on |
| underlying cloud) |
| :param dict db_dict: where to write to database when the status changes. |
| It contains a dictionary with {collection: str, filter: {}, path: str}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float progress_timeout: |
| :param float total_timeout: |
| :returns str: id of the execution environment |
| """ |
| |
| @abc.abstractmethod |
| async def install_configuration_sw( |
| self, |
| ee_id: str, |
| artifact_path: str, |
| db_dict: dict, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| ): |
| """ |
| Install the software inside the execution environment identified by ee_id |
| |
| :param str ee_id: the id of the execution environment returned by |
| create_execution_environment or register_execution_environment |
| :param str artifact_path: where to locate the artifacts (parent folder) using |
| the self.fs |
| the final artifact path will be a combination of this artifact_path and |
| additional string from the config_dict (e.g. charm name) |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float progress_timeout: |
| :param float total_timeout: |
| """ |
| |
| @abc.abstractmethod |
| async def install_k8s_proxy_charm( |
| self, |
| charm_name: str, |
| namespace: str, |
| artifact_path: str, |
| db_dict: dict, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| config: dict = None, |
| ) -> str: |
| """ |
| Install a k8s proxy charm |
| |
| :param charm_name: Name of the charm being deployed |
| :param namespace: collection of all the uuids related to the charm. |
| :param str artifact_path: where to locate the artifacts (parent folder) using |
| the self.fs |
| the final artifact path will be a combination of this artifact_path and |
| additional string from the config_dict (e.g. charm name) |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float progress_timeout: |
| :param float total_timeout: |
| :param config: Dictionary with additional configuration |
| |
| :returns ee_id: execution environment id. |
| """ |
| |
| @abc.abstractmethod |
| async def get_ee_ssh_public__key( |
| self, |
| ee_id: str, |
| db_dict: dict, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| ) -> str: |
| """ |
| Generate a priv/pub key pair in the execution environment and return the public |
| key |
| |
| :param str ee_id: the id of the execution environment returned by |
| create_execution_environment or register_execution_environment |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float progress_timeout: |
| :param float total_timeout: |
| :returns: public key of the execution environment |
| For the case of juju proxy charm ssh-layered, it is the one |
| returned by 'get-ssh-public-key' primitive. |
| It raises a N2VC exception if fails |
| """ |
| |
| @abc.abstractmethod |
| async def add_relation( |
| self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str |
| ): |
| """ |
| Add a relation between two Execution Environments (using their associated |
| endpoints). |
| |
| :param str ee_id_1: The id of the first execution environment |
| :param str ee_id_2: The id of the second execution environment |
| :param str endpoint_1: The endpoint in the first execution environment |
| :param str endpoint_2: The endpoint in the second execution environment |
| """ |
| |
| # TODO |
| @abc.abstractmethod |
| async def remove_relation(self): |
| """ """ |
| |
| # TODO |
| @abc.abstractmethod |
| async def deregister_execution_environments(self): |
| """ """ |
| |
| @abc.abstractmethod |
| async def delete_namespace( |
| self, namespace: str, db_dict: dict = None, total_timeout: float = None |
| ): |
| """ |
| Remove a network scenario and its execution environments |
| :param namespace: [<nsi-id>].<ns-id> |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float total_timeout: |
| """ |
| |
| @abc.abstractmethod |
| async def delete_execution_environment( |
| self, ee_id: str, db_dict: dict = None, total_timeout: float = None |
| ): |
| """ |
| Delete an execution environment |
| :param str ee_id: id of the execution environment to delete |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float total_timeout: |
| """ |
| |
| @abc.abstractmethod |
| async def upgrade_charm( |
| self, |
| ee_id: str = None, |
| path: str = None, |
| charm_id: str = None, |
| charm_type: str = None, |
| timeout: float = None, |
| ) -> str: |
| """This method upgrade charms in VNFs |
| |
| Args: |
| ee_id: Execution environment id |
| path: Local path to the charm |
| charm_id: charm-id |
| charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm |
| timeout: (Float) Timeout for the ns update operation |
| |
| Returns: |
| The output of the update operation if status equals to "completed" |
| """ |
| |
| @abc.abstractmethod |
| async def exec_primitive( |
| self, |
| ee_id: str, |
| primitive_name: str, |
| params_dict: dict, |
| db_dict: dict = None, |
| progress_timeout: float = None, |
| total_timeout: float = None, |
| ) -> str: |
| """ |
| Execute a primitive in the execution environment |
| |
| :param str ee_id: the one returned by create_execution_environment or |
| register_execution_environment |
| :param str primitive_name: must be one defined in the software. There is one |
| called 'config', where, for the proxy case, the 'credentials' of VM are |
| provided |
| :param dict params_dict: parameters of the action |
| :param dict db_dict: where to write into database when the status changes. |
| It contains a dict with |
| {collection: <str>, filter: {}, path: <str>}, |
| e.g. {collection: "nsrs", filter: |
| {_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
| :param float progress_timeout: |
| :param float total_timeout: |
| :returns str: primitive result, if ok. It raises exceptions in case of fail |
| """ |
| |
| async def disconnect(self): |
| """ |
| Disconnect from VCA |
| """ |
| |
| """ |
| #################################################################################### |
| ################################### P R I V A T E ################################## |
| #################################################################################### |
| """ |
| |
| def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str): |
| """ |
| Split namespace components |
| |
| :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>] |
| :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count |
| """ |
| |
| # check parameters |
| if namespace is None or len(namespace) == 0: |
| raise N2VCBadArgumentsException( |
| "Argument namespace is mandatory", ["namespace"] |
| ) |
| |
| # split namespace components |
| parts = namespace.split(".") |
| nsi_id = None |
| ns_id = None |
| vnf_id = None |
| vdu_id = None |
| vdu_count = None |
| if len(parts) > 0 and len(parts[0]) > 0: |
| nsi_id = parts[0] |
| if len(parts) > 1 and len(parts[1]) > 0: |
| ns_id = parts[1] |
| if len(parts) > 2 and len(parts[2]) > 0: |
| vnf_id = parts[2] |
| if len(parts) > 3 and len(parts[3]) > 0: |
| vdu_id = parts[3] |
| vdu_parts = parts[3].split("-") |
| if len(vdu_parts) > 1: |
| vdu_id = vdu_parts[0] |
| vdu_count = vdu_parts[1] |
| |
| return nsi_id, ns_id, vnf_id, vdu_id, vdu_count |
| |
| async def write_app_status_to_db( |
| self, |
| db_dict: dict, |
| status: N2VCDeploymentStatus, |
| detailed_status: str, |
| vca_status: str, |
| entity_type: str, |
| vca_id: str = None, |
| ): |
| """ |
| Write application status to database |
| |
| :param: db_dict: DB dictionary |
| :param: status: Status of the application |
| :param: detailed_status: Detailed status |
| :param: vca_status: VCA status |
| :param: entity_type: Entity type ("application", "machine, and "action") |
| :param: vca_id: Id of the VCA. If None, the default VCA will be used. |
| """ |
| if not db_dict: |
| self.log.debug("No db_dict => No database write") |
| return |
| |
| # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}' |
| # .format(str(status.value), detailed_status, vca_status, entity_type)) |
| |
| try: |
| the_table = db_dict["collection"] |
| the_filter = db_dict["filter"] |
| the_path = db_dict["path"] |
| if not the_path[-1] == ".": |
| the_path = the_path + "." |
| update_dict = { |
| the_path + "status": str(status.value), |
| the_path + "detailed-status": detailed_status, |
| the_path + "VCA-status": vca_status, |
| the_path + "entity-type": entity_type, |
| the_path + "status-time": str(time.time()), |
| } |
| |
| self.db.set_one( |
| table=the_table, |
| q_filter=the_filter, |
| update_dict=update_dict, |
| fail_on_empty=True, |
| ) |
| |
| # database callback |
| if self.on_update_db: |
| if asyncio.iscoroutinefunction(self.on_update_db): |
| await self.on_update_db( |
| the_table, the_filter, the_path, update_dict, vca_id=vca_id |
| ) |
| else: |
| self.on_update_db( |
| the_table, the_filter, the_path, update_dict, vca_id=vca_id |
| ) |
| |
| except DbException as e: |
| if e.http_code == HTTPStatus.NOT_FOUND: |
| self.log.error( |
| "NOT_FOUND error: Exception writing status to database: {}".format( |
| e |
| ) |
| ) |
| else: |
| self.log.info("Exception writing status to database: {}".format(e)) |
| |
| def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus: |
| if status not in JujuStatusToOSM[entity_type]: |
| self.log.warning("Status {} not found in JujuStatusToOSM.".format(status)) |
| return N2VCDeploymentStatus.UNKNOWN |
| return JujuStatusToOSM[entity_type][status] |
| |
| |
| def obj_to_yaml(obj: object) -> str: |
| # dump to yaml |
| dump_text = yaml.dump(obj, default_flow_style=False, indent=2) |
| # split lines |
| lines = dump_text.splitlines() |
| # remove !!python/object tags |
| yaml_text = "" |
| for line in lines: |
| index = line.find("!!python/object") |
| if index >= 0: |
| line = line[:index] |
| yaml_text += line + "\n" |
| return yaml_text |
| |
| |
| def obj_to_dict(obj: object) -> dict: |
| # convert obj to yaml |
| yaml_text = obj_to_yaml(obj) |
| # parse to dict |
| return yaml.load(yaml_text, Loader=yaml.SafeLoader) |