X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fn2vc_conn.py;h=c0bb558a3a05e0eac260dcc5971fc280fde8ef31;hp=97b6188c8349950bf2a3155a2c006cce5b5f5499;hb=refs%2Fchanges%2F96%2F8996%2F2;hpb=2d413435b8530cf7b2c8e49cf8cf157679e72432 diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index 97b6188..c0bb558 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -23,24 +23,26 @@ import abc import asyncio +from enum import Enum +from http import HTTPStatus import os -import subprocess import shlex +import subprocess import time -from enum import Enum -from http import HTTPStatus -from n2vc.loggable import Loggable -from n2vc.exceptions import N2VCBadArgumentsException +from n2vc.exceptions import N2VCBadArgumentsException from osm_common.dbmongo import DbException +import yaml + +from n2vc.loggable import Loggable class N2VCDeploymentStatus(Enum): - PENDING = 'pending' - RUNNING = 'running' - COMPLETED = 'completed' - FAILED = 'failed' - UNKNOWN = 'unknown' + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + UNKNOWN = "unknown" class N2VCConnector(abc.ABC, Loggable): @@ -50,35 +52,39 @@ class N2VCConnector(abc.ABC, Loggable): """ """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ def __init__( - self, - db: object, - fs: object, - log: object, - loop: object, - url: str, - username: str, - vca_config: dict, - on_update_db = None + self, + db: object, + fs: object, + log: object, + loop: object, + url: str, + username: str, + vca_config: dict, + on_update_db=None, ): """Initialize N2VC abstract connector. It defines de API for VCA connectors :param object db: Mongo object managing the MongoDB (repo common DbBase) - :param object fs: FileSystem object managing the package artifacts (repo common FsBase) + :param object fs: FileSystem object managing the package artifacts (repo common + FsBase) :param object log: the logging object to log to :param object loop: the loop to use for asyncio (default current thread loop) - :param str url: a string that how to connect to the VCA (if needed, IP and port can be obtained from there) + :param str url: a string that how to connect to the VCA (if needed, IP and port + can be obtained from there) :param str username: the username to authenticate with VCA - :param dict vca_config: Additional parameters for the specific VCA. For example, for juju it will contain: + :param dict vca_config: Additional parameters for the specific VCA. For example, + for juju it will contain: secret: The password to authenticate with public_key: The contents of the juju public SSH key ca_cert str: The CA certificate used to authenticate - :param on_update_db: callback called when n2vc connector updates database. Received arguments: + :param on_update_db: callback called when n2vc connector updates database. + Received arguments: table: e.g. "nsrs" filter: e.g. {_id: } path: e.g. "_admin.deployed.VCA.3." @@ -86,15 +92,26 @@ class N2VCConnector(abc.ABC, Loggable): """ # parent class - Loggable.__init__(self, log=log, log_to_console=True, prefix='\nN2VC') + Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC") # check arguments if db is None: - raise N2VCBadArgumentsException('Argument db is mandatory', ['db']) + raise N2VCBadArgumentsException("Argument db is mandatory", ["db"]) if fs is None: - raise N2VCBadArgumentsException('Argument fs is mandatory', ['fs']) - - self.info('url={}, username={}, vca_config={}'.format(url, username, vca_config)) + raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"]) + + self.log.info( + "url={}, username={}, vca_config={}".format( + url, + username, + { + k: v + for k, v in vca_config.items() + if k + not in ("host", "port", "user", "secret", "public_key", "ca_cert") + }, + ) + ) # store arguments into self self.db = db @@ -106,31 +123,35 @@ class N2VCConnector(abc.ABC, Loggable): self.on_update_db = on_update_db # generate private/public key-pair + self.private_key_path = None + self.public_key_path = None self.get_public_key() @abc.abstractmethod - async def get_status(self, namespace: str): + async def get_status(self, namespace: str, yaml_format: bool = True): """Get namespace status :param namespace: we obtain ns from namespace + :param yaml_format: returns a yaml string """ # TODO: review which public key - async def get_public_key(self) -> str: + def get_public_key(self) -> str: """Get the VCA ssh-public-key - Returns the SSH public key from local mahine, to be injected into virtual machines to - be managed by the VCA. + Returns the SSH public key from local mahine, to be injected into virtual + machines to be managed by the VCA. First run, a ssh keypair will be created. The public key is injected into a VM so that we can provision the - machine with Juju, after which Juju will communicate with the VM + machine with Juju, after which Juju will communicate with the VM directly via the juju agent. """ - public_key = '' - # Find the path where we expect our key lives (~/.ssh) - homedir = os.environ['HOME'] + homedir = os.environ.get("HOME") + if not homedir: + self.warning("No HOME environment variable, using /tmp") + homedir = "/tmp" sshdir = "{}/.ssh".format(homedir) if not os.path.exists(sshdir): os.mkdir(sshdir) @@ -141,9 +162,7 @@ class N2VCConnector(abc.ABC, Loggable): # If we don't have a key generated, then we have to generate it using ssh-keygen if not os.path.exists(self.private_key_path): cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( - "rsa", - "4096", - self.private_key_path + "rsa", "4096", self.private_key_path ) # run command with arguments subprocess.check_output(shlex.split(cmd)) @@ -161,20 +180,24 @@ class N2VCConnector(abc.ABC, Loggable): db_dict: dict, reuse_ee_id: str = None, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> (str, dict): - """Create an Execution Environment. Returns when it is created or raises an exception on failing + """Create an Execution Environment. Returns when it is created or raises an + exception on failing :param str namespace: Contains a dot separate string. LCM will use: []...[-] :param dict db_dict: where to write to database when the status changes. It contains a dictionary with {collection: str, filter: {}, path: str}, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} - :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an older environment + e.g. {collection: "nsrs", filter: {_id: , path: + "_admin.deployed.VCA.3"} + :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an + older environment :param float progress_timeout: :param float total_timeout: :returns str, dict: id of the new execution environment and credentials for it - (credentials can contains hostname, username, etc depending on underlying cloud) + (credentials can contains hostname, username, etc depending on + underlying cloud) """ @abc.abstractmethod @@ -184,17 +207,20 @@ class N2VCConnector(abc.ABC, Loggable): credentials: dict, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ Register an existing execution environment at the VCA :param str namespace: same as create_execution_environment method - :param dict credentials: credentials to access the existing execution environment - (it can contains hostname, username, path to private key, etc depending on underlying cloud) + :param dict credentials: credentials to access the existing execution + environment + (it can contains hostname, username, path to private key, etc depending on + underlying cloud) :param dict db_dict: where to write to database when the status changes. It contains a dictionary with {collection: str, filter: {}, path: str}, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns str: id of the execution environment @@ -207,19 +233,22 @@ class N2VCConnector(abc.ABC, Loggable): artifact_path: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ): """ Install the software inside the execution environment identified by ee_id - :param str ee_id: the id of the execution environment returned by create_execution_environment - or register_execution_environment - :param str artifact_path: where to locate the artifacts (parent folder) using the self.fs - the final artifact path will be a combination of this artifact_path and additional string from - the config_dict (e.g. charm name) + :param str ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment + :param str artifact_path: where to locate the artifacts (parent folder) using + the self.fs + the final artifact path will be a combination of this artifact_path and + additional string from the config_dict (e.g. charm name) :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: """ @@ -230,34 +259,34 @@ class N2VCConnector(abc.ABC, Loggable): ee_id: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ - Generate a priv/pub key pair in the execution environment and return the public key + Generate a priv/pub key pair in the execution environment and return the public + key - :param str ee_id: the id of the execution environment returned by create_execution_environment - or register_execution_environment + :param str ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns: public key of the execution environment - For the case of juju proxy charm ssh-layered, it is the one returned by 'get-ssh-public-key' - primitive. + For the case of juju proxy charm ssh-layered, it is the one + returned by 'get-ssh-public-key' primitive. It raises a N2VC exception if fails """ @abc.abstractmethod async def add_relation( - self, - ee_id_1: str, - ee_id_2: str, - endpoint_1: str, - endpoint_2: str + self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str ): """ - Add a relation between two Execution Environments (using their associated endpoints). + Add a relation between two Execution Environments (using their associated + endpoints). :param str ee_id_1: The id of the first execution environment :param str ee_id_2: The id of the second execution environment @@ -267,49 +296,43 @@ class N2VCConnector(abc.ABC, Loggable): # TODO @abc.abstractmethod - async def remove_relation( - self - ): + async def remove_relation(self): """ """ # TODO @abc.abstractmethod - async def deregister_execution_environments( - self - ): + async def deregister_execution_environments(self): """ """ @abc.abstractmethod async def delete_namespace( - self, - namespace: str, - db_dict: dict = None, - total_timeout: float = None + self, namespace: str, db_dict: dict = None, total_timeout: float = None ): """ Remove a network scenario and its execution environments :param namespace: []. :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float total_timeout: """ @abc.abstractmethod async def delete_execution_environment( - self, - ee_id: str, - db_dict: dict = None, - total_timeout: float = None + self, ee_id: str, db_dict: dict = None, total_timeout: float = None ): """ Delete an execution environment :param str ee_id: id of the execution environment to delete :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float total_timeout: """ @@ -321,18 +344,22 @@ class N2VCConnector(abc.ABC, Loggable): params_dict: dict, db_dict: dict = None, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ Execute a primitive in the execution environment - :param str ee_id: the one returned by create_execution_environment or register_execution_environment - :param str primitive_name: must be one defined in the software. There is one called 'config', - where, for the proxy case, the 'credentials' of VM are provided + :param str ee_id: the one returned by create_execution_environment or + register_execution_environment + :param str primitive_name: must be one defined in the software. There is one + called 'config', where, for the proxy case, the 'credentials' of VM are + provided :param dict params_dict: parameters of the action :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns str: primitive result, if ok. It raises exceptions in case of fail @@ -344,9 +371,9 @@ class N2VCConnector(abc.ABC, Loggable): """ """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str): @@ -359,10 +386,12 @@ class N2VCConnector(abc.ABC, Loggable): # check parameters if namespace is None or len(namespace) == 0: - raise N2VCBadArgumentsException('Argument namespace is mandatory', ['namespace']) + raise N2VCBadArgumentsException( + "Argument namespace is mandatory", ["namespace"] + ) # split namespace components - parts = namespace.split('.') + parts = namespace.split(".") nsi_id = None ns_id = None vnf_id = None @@ -376,7 +405,7 @@ class N2VCConnector(abc.ABC, Loggable): vnf_id = parts[2] if len(parts) > 3 and len(parts[3]) > 0: vdu_id = parts[3] - vdu_parts = parts[3].split('-') + vdu_parts = parts[3].split("-") if len(vdu_parts) > 1: vdu_id = vdu_parts[0] vdu_count = vdu_parts[1] @@ -384,79 +413,109 @@ class N2VCConnector(abc.ABC, Loggable): return nsi_id, ns_id, vnf_id, vdu_id, vdu_count async def write_app_status_to_db( - self, - db_dict: dict, - status: N2VCDeploymentStatus, - detailed_status: str, - vca_status: str, - entity_type: str + self, + db_dict: dict, + status: N2VCDeploymentStatus, + detailed_status: str, + vca_status: str, + entity_type: str, ): if not db_dict: - self.debug('No db_dict => No database write') + self.log.debug("No db_dict => No database write") return - self.debug('status={} / detailed-status={} / VCA-status={} / entity_type={}' - .format(str(status.value), detailed_status, vca_status, entity_type)) + # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}' + # .format(str(status.value), detailed_status, vca_status, entity_type)) try: - the_table = db_dict['collection'] - the_filter = db_dict['filter'] - the_path = db_dict['path'] - if not the_path[-1] == '.': - the_path = the_path + '.' + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + the_path = db_dict["path"] + if not the_path[-1] == ".": + the_path = the_path + "." update_dict = { - the_path + 'status': str(status.value), - the_path + 'detailed-status': detailed_status, - the_path + 'VCA-status': vca_status, - the_path + 'entity-type': entity_type, - the_path + 'status-time': str(time.time()), + the_path + "status": str(status.value), + the_path + "detailed-status": detailed_status, + the_path + "VCA-status": vca_status, + the_path + "entity-type": entity_type, + the_path + "status-time": str(time.time()), } self.db.set_one( table=the_table, q_filter=the_filter, update_dict=update_dict, - fail_on_empty=True + fail_on_empty=True, ) # database callback if self.on_update_db: if asyncio.iscoroutinefunction(self.on_update_db): - await self.on_update_db(the_table, the_filter, the_path, update_dict) + await self.on_update_db( + the_table, the_filter, the_path, update_dict + ) else: self.on_update_db(the_table, the_filter, the_path, update_dict) except DbException as e: if e.http_code == HTTPStatus.NOT_FOUND: - self.error('NOT_FOUND error: Exception writing status to database: {}'.format(e)) + self.log.error( + "NOT_FOUND error: Exception writing status to database: {}".format( + e + ) + ) else: - self.info('Exception writing status to database: {}'.format(e)) + self.log.info("Exception writing status to database: {}".format(e)) -def juju_status_2_osm_status(type: str, status: str) -> N2VCDeploymentStatus: - if type == 'application' or type == 'unit': - if status in ['waiting', 'maintenance']: +def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: + if statustype == "application" or statustype == "unit": + if status in ["waiting", "maintenance"]: return N2VCDeploymentStatus.RUNNING - elif status in ['active']: + if status in ["error"]: + return N2VCDeploymentStatus.FAILED + elif status in ["active"]: return N2VCDeploymentStatus.COMPLETED - elif status in ['blocked']: + elif status in ["blocked"]: return N2VCDeploymentStatus.RUNNING else: return N2VCDeploymentStatus.UNKNOWN - elif type == 'action': - if status in ['running']: + elif statustype == "action": + if status in ["running"]: return N2VCDeploymentStatus.RUNNING - elif status in ['completed']: + elif status in ["completed"]: return N2VCDeploymentStatus.COMPLETED else: return N2VCDeploymentStatus.UNKNOWN - elif type == 'machine': - if status in ['pending']: + elif statustype == "machine": + if status in ["pending"]: return N2VCDeploymentStatus.PENDING - elif status in ['started']: + elif status in ["started"]: return N2VCDeploymentStatus.COMPLETED else: return N2VCDeploymentStatus.UNKNOWN return N2VCDeploymentStatus.FAILED + + +def obj_to_yaml(obj: object) -> str: + # dump to yaml + dump_text = yaml.dump(obj, default_flow_style=False, indent=2) + # split lines + lines = dump_text.splitlines() + # remove !!python/object tags + yaml_text = "" + for line in lines: + index = line.find("!!python/object") + if index >= 0: + line = line[:index] + yaml_text += line + "\n" + return yaml_text + + +def obj_to_dict(obj: object) -> dict: + # convert obj to yaml + yaml_text = obj_to_yaml(obj) + # parse to dict + return yaml.load(yaml_text, Loader=yaml.Loader)