import abc
import asyncio
+from http import HTTPStatus
import os
-import subprocess
import shlex
+import subprocess
import time
-from enum import Enum
-from http import HTTPStatus
-from n2vc.loggable import Loggable
-from n2vc.exceptions import N2VCBadArgumentsException
+from n2vc.exceptions import N2VCBadArgumentsException
from osm_common.dbmongo import DbException
+import yaml
-
-class N2VCDeploymentStatus(Enum):
- PENDING = 'pending'
- RUNNING = 'running'
- COMPLETED = 'completed'
- FAILED = 'failed'
- UNKNOWN = 'unknown'
+from n2vc.loggable import Loggable
+from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
class N2VCConnector(abc.ABC, Loggable):
"""
"""
- ##################################################################################################
- ########################################## P U B L I C ###########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
def __init__(
- self,
- db: object,
- fs: object,
- log: object,
- loop: object,
- url: str,
- username: str,
- vca_config: dict,
- on_update_db = None
+ self,
+ db: object,
+ fs: object,
+ log: object,
+ loop: object,
+ on_update_db=None,
+ **kwargs,
):
"""Initialize N2VC abstract connector. It defines de API for VCA connectors
:param object db: Mongo object managing the MongoDB (repo common DbBase)
- :param object fs: FileSystem object managing the package artifacts (repo common FsBase)
+ :param object fs: FileSystem object managing the package artifacts (repo common
+ FsBase)
:param object log: the logging object to log to
:param object loop: the loop to use for asyncio (default current thread loop)
- :param str url: a string that how to connect to the VCA (if needed, IP and port can be obtained from there)
- :param str username: the username to authenticate with VCA
- :param dict vca_config: Additional parameters for the specific VCA. For example, for juju it will contain:
- secret: The password to authenticate with
- public_key: The contents of the juju public SSH key
- ca_cert str: The CA certificate used to authenticate
- :param on_update_db: callback called when n2vc connector updates database. Received arguments:
+ :param on_update_db: callback called when n2vc connector updates database.
+ Received arguments:
table: e.g. "nsrs"
filter: e.g. {_id: <nsd-id> }
path: e.g. "_admin.deployed.VCA.3."
"""
# parent class
- Loggable.__init__(self, log=log, log_to_console=True, prefix='\nN2VC')
+ Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
# check arguments
if db is None:
- raise N2VCBadArgumentsException('Argument db is mandatory', ['db'])
+ raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
if fs is None:
- raise N2VCBadArgumentsException('Argument fs is mandatory', ['fs'])
-
- self.info('url={}, username={}, vca_config={}'.format(url, username, vca_config))
+ raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
# store arguments into self
self.db = db
self.fs = fs
self.loop = loop or asyncio.get_event_loop()
- self.url = url
- self.username = username
- self.vca_config = vca_config
self.on_update_db = on_update_db
# generate private/public key-pair
- self.get_public_key()
+ self.private_key_path = None
+ self.public_key_path = None
@abc.abstractmethod
- async def get_status(self, namespace: str):
+ async def get_status(self, namespace: str, yaml_format: bool = True):
"""Get namespace status
:param namespace: we obtain ns from namespace
+ :param yaml_format: returns a yaml string
"""
# TODO: review which public key
- async def get_public_key(self) -> str:
+ def get_public_key(self) -> str:
"""Get the VCA ssh-public-key
- Returns the SSH public key from local mahine, to be injected into virtual machines to
- be managed by the VCA.
+ Returns the SSH public key from local mahine, to be injected into virtual
+ machines to be managed by the VCA.
First run, a ssh keypair will be created.
The public key is injected into a VM so that we can provision the
- machine with Juju, after which Juju will communicate with the VM
+ machine with Juju, after which Juju will communicate with the VM
directly via the juju agent.
"""
- public_key = ''
-
# Find the path where we expect our key lives (~/.ssh)
- homedir = os.environ['HOME']
+ homedir = os.environ.get("HOME")
+ if not homedir:
+ self.log.warning("No HOME environment variable, using /tmp")
+ homedir = "/tmp"
sshdir = "{}/.ssh".format(homedir)
if not os.path.exists(sshdir):
os.mkdir(sshdir)
# If we don't have a key generated, then we have to generate it using ssh-keygen
if not os.path.exists(self.private_key_path):
cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
- "rsa",
- "4096",
- self.private_key_path
+ "rsa", "4096", self.private_key_path
)
# run command with arguments
subprocess.check_output(shlex.split(cmd))
db_dict: dict,
reuse_ee_id: str = None,
progress_timeout: float = None,
- total_timeout: float = None
+ total_timeout: float = None,
) -> (str, dict):
- """Create an Execution Environment. Returns when it is created or raises an exception on failing
+ """Create an Execution Environment. Returns when it is created or raises an
+ exception on failing
:param str namespace: Contains a dot separate string.
LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
:param dict db_dict: where to write to database when the status changes.
It contains a dictionary with {collection: str, filter: {}, path: str},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
- :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an older environment
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
+ "_admin.deployed.VCA.3"}
+ :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
+ older environment
:param float progress_timeout:
:param float total_timeout:
:returns str, dict: id of the new execution environment and credentials for it
- (credentials can contains hostname, username, etc depending on underlying cloud)
+ (credentials can contains hostname, username, etc depending on
+ underlying cloud)
"""
@abc.abstractmethod
credentials: dict,
db_dict: dict,
progress_timeout: float = None,
- total_timeout: float = None
+ total_timeout: float = None,
) -> str:
"""
Register an existing execution environment at the VCA
:param str namespace: same as create_execution_environment method
- :param dict credentials: credentials to access the existing execution environment
- (it can contains hostname, username, path to private key, etc depending on underlying cloud)
+ :param dict credentials: credentials to access the existing execution
+ environment
+ (it can contains hostname, username, path to private key, etc depending on
+ underlying cloud)
:param dict db_dict: where to write to database when the status changes.
It contains a dictionary with {collection: str, filter: {}, path: str},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float progress_timeout:
:param float total_timeout:
:returns str: id of the execution environment
artifact_path: str,
db_dict: dict,
progress_timeout: float = None,
- total_timeout: float = None
+ total_timeout: float = None,
):
"""
Install the software inside the execution environment identified by ee_id
- :param str ee_id: the id of the execution environment returned by create_execution_environment
- or register_execution_environment
- :param str artifact_path: where to locate the artifacts (parent folder) using the self.fs
- the final artifact path will be a combination of this artifact_path and additional string from
- the config_dict (e.g. charm name)
+ :param str ee_id: the id of the execution environment returned by
+ create_execution_environment or register_execution_environment
+ :param str artifact_path: where to locate the artifacts (parent folder) using
+ the self.fs
+ the final artifact path will be a combination of this artifact_path and
+ additional string from the config_dict (e.g. charm name)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param float progress_timeout:
+ :param float total_timeout:
+ """
+
+ @abc.abstractmethod
+ async def install_k8s_proxy_charm(
+ self,
+ charm_name: str,
+ namespace: str,
+ artifact_path: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ ) -> str:
+ """
+ Install a k8s proxy charm
+
+ :param charm_name: Name of the charm being deployed
+ :param namespace: collection of all the uuids related to the charm.
+ :param str artifact_path: where to locate the artifacts (parent folder) using
+ the self.fs
+ the final artifact path will be a combination of this artifact_path and
+ additional string from the config_dict (e.g. charm name)
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float progress_timeout:
:param float total_timeout:
+ :param config: Dictionary with additional configuration
+
+ :returns ee_id: execution environment id.
"""
@abc.abstractmethod
ee_id: str,
db_dict: dict,
progress_timeout: float = None,
- total_timeout: float = None
+ total_timeout: float = None,
) -> str:
"""
- Generate a priv/pub key pair in the execution environment and return the public key
+ Generate a priv/pub key pair in the execution environment and return the public
+ key
- :param str ee_id: the id of the execution environment returned by create_execution_environment
- or register_execution_environment
+ :param str ee_id: the id of the execution environment returned by
+ create_execution_environment or register_execution_environment
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float progress_timeout:
:param float total_timeout:
:returns: public key of the execution environment
- For the case of juju proxy charm ssh-layered, it is the one returned by 'get-ssh-public-key'
- primitive.
+ For the case of juju proxy charm ssh-layered, it is the one
+ returned by 'get-ssh-public-key' primitive.
It raises a N2VC exception if fails
"""
@abc.abstractmethod
async def add_relation(
- self,
- ee_id_1: str,
- ee_id_2: str,
- endpoint_1: str,
- endpoint_2: str
+ self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
):
"""
- Add a relation between two Execution Environments (using their associated endpoints).
+ Add a relation between two Execution Environments (using their associated
+ endpoints).
:param str ee_id_1: The id of the first execution environment
:param str ee_id_2: The id of the second execution environment
# TODO
@abc.abstractmethod
- async def remove_relation(
- self
- ):
- """
- """
+ async def remove_relation(self):
+ """ """
# TODO
@abc.abstractmethod
- async def deregister_execution_environments(
- self
- ):
- """
- """
+ async def deregister_execution_environments(self):
+ """ """
@abc.abstractmethod
async def delete_namespace(
- self,
- namespace: str,
- db_dict: dict = None,
- total_timeout: float = None
+ self, namespace: str, db_dict: dict = None, total_timeout: float = None
):
"""
Remove a network scenario and its execution environments
:param namespace: [<nsi-id>].<ns-id>
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float total_timeout:
"""
@abc.abstractmethod
async def delete_execution_environment(
- self,
- ee_id: str,
- db_dict: dict = None,
- total_timeout: float = None
+ self, ee_id: str, db_dict: dict = None, total_timeout: float = None
):
"""
Delete an execution environment
:param str ee_id: id of the execution environment to delete
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float total_timeout:
"""
+ @abc.abstractmethod
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+
@abc.abstractmethod
async def exec_primitive(
self,
params_dict: dict,
db_dict: dict = None,
progress_timeout: float = None,
- total_timeout: float = None
+ total_timeout: float = None,
) -> str:
"""
Execute a primitive in the execution environment
- :param str ee_id: the one returned by create_execution_environment or register_execution_environment
- :param str primitive_name: must be one defined in the software. There is one called 'config',
- where, for the proxy case, the 'credentials' of VM are provided
+ :param str ee_id: the one returned by create_execution_environment or
+ register_execution_environment
+ :param str primitive_name: must be one defined in the software. There is one
+ called 'config', where, for the proxy case, the 'credentials' of VM are
+ provided
:param dict params_dict: parameters of the action
:param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {}, path: <str>},
- e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
:param float progress_timeout:
:param float total_timeout:
:returns str: primitive result, if ok. It raises exceptions in case of fail
"""
"""
- ##################################################################################################
- ########################################## P R I V A T E #########################################
- ##################################################################################################
+ ####################################################################################
+ ################################### P R I V A T E ##################################
+ ####################################################################################
"""
def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
# check parameters
if namespace is None or len(namespace) == 0:
- raise N2VCBadArgumentsException('Argument namespace is mandatory', ['namespace'])
+ raise N2VCBadArgumentsException(
+ "Argument namespace is mandatory", ["namespace"]
+ )
# split namespace components
- parts = namespace.split('.')
+ parts = namespace.split(".")
nsi_id = None
ns_id = None
vnf_id = None
vnf_id = parts[2]
if len(parts) > 3 and len(parts[3]) > 0:
vdu_id = parts[3]
- vdu_parts = parts[3].split('-')
+ vdu_parts = parts[3].split("-")
if len(vdu_parts) > 1:
vdu_id = vdu_parts[0]
vdu_count = vdu_parts[1]
return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
async def write_app_status_to_db(
- self,
- db_dict: dict,
- status: N2VCDeploymentStatus,
- detailed_status: str,
- vca_status: str,
- entity_type: str
+ self,
+ db_dict: dict,
+ status: N2VCDeploymentStatus,
+ detailed_status: str,
+ vca_status: str,
+ entity_type: str,
+ vca_id: str = None,
):
+ """
+ Write application status to database
+
+ :param: db_dict: DB dictionary
+ :param: status: Status of the application
+ :param: detailed_status: Detailed status
+ :param: vca_status: VCA status
+ :param: entity_type: Entity type ("application", "machine, and "action")
+ :param: vca_id: Id of the VCA. If None, the default VCA will be used.
+ """
if not db_dict:
- self.debug('No db_dict => No database write')
+ self.log.debug("No db_dict => No database write")
return
- self.debug('status={} / detailed-status={} / VCA-status={} / entity_type={}'
- .format(str(status.value), detailed_status, vca_status, entity_type))
+ # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
+ # .format(str(status.value), detailed_status, vca_status, entity_type))
try:
- the_table = db_dict['collection']
- the_filter = db_dict['filter']
- the_path = db_dict['path']
- if not the_path[-1] == '.':
- the_path = the_path + '.'
+ the_table = db_dict["collection"]
+ the_filter = db_dict["filter"]
+ the_path = db_dict["path"]
+ if not the_path[-1] == ".":
+ the_path = the_path + "."
update_dict = {
- the_path + 'status': str(status.value),
- the_path + 'detailed-status': detailed_status,
- the_path + 'VCA-status': vca_status,
- the_path + 'entity-type': entity_type,
- the_path + 'status-time': str(time.time()),
+ the_path + "status": str(status.value),
+ the_path + "detailed-status": detailed_status,
+ the_path + "VCA-status": vca_status,
+ the_path + "entity-type": entity_type,
+ the_path + "status-time": str(time.time()),
}
self.db.set_one(
table=the_table,
q_filter=the_filter,
update_dict=update_dict,
- fail_on_empty=True
+ fail_on_empty=True,
)
# database callback
if self.on_update_db:
if asyncio.iscoroutinefunction(self.on_update_db):
- await self.on_update_db(the_table, the_filter, the_path, update_dict)
+ await self.on_update_db(
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
+ )
else:
- self.on_update_db(the_table, the_filter, the_path, update_dict)
+ self.on_update_db(
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
+ )
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
- self.error('NOT_FOUND error: Exception writing status to database: {}'.format(e))
+ self.log.error(
+ "NOT_FOUND error: Exception writing status to database: {}".format(
+ e
+ )
+ )
else:
- self.info('Exception writing status to database: {}'.format(e))
-
-
-def juju_status_2_osm_status(type: str, status: str) -> N2VCDeploymentStatus:
- if type == 'application' or type == 'unit':
- if status in ['waiting', 'maintenance']:
- return N2VCDeploymentStatus.RUNNING
- elif status in ['active']:
- return N2VCDeploymentStatus.COMPLETED
- elif status in ['blocked']:
- return N2VCDeploymentStatus.RUNNING
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif type == 'action':
- if status in ['running']:
- return N2VCDeploymentStatus.RUNNING
- elif status in ['completed']:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif type == 'machine':
- if status in ['pending']:
- return N2VCDeploymentStatus.PENDING
- elif status in ['started']:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
+ self.log.info("Exception writing status to database: {}".format(e))
- return N2VCDeploymentStatus.FAILED
+ def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
+ if status not in JujuStatusToOSM[entity_type]:
+ self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
+ return N2VCDeploymentStatus.UNKNOWN
+ return JujuStatusToOSM[entity_type][status]
+
+
+def obj_to_yaml(obj: object) -> str:
+ # dump to yaml
+ dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
+ # split lines
+ lines = dump_text.splitlines()
+ # remove !!python/object tags
+ yaml_text = ""
+ for line in lines:
+ index = line.find("!!python/object")
+ if index >= 0:
+ line = line[:index]
+ yaml_text += line + "\n"
+ return yaml_text
+
+
+def obj_to_dict(obj: object) -> dict:
+ # convert obj to yaml
+ yaml_text = obj_to_yaml(obj)
+ # parse to dict
+ return yaml.load(yaml_text, Loader=yaml.Loader)