import abc
import asyncio
from http import HTTPStatus
+from shlex import quote
import os
import shlex
import subprocess
import yaml
from n2vc.loggable import Loggable
-from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
+from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
class N2VCConnector(abc.ABC, Loggable):
db: object,
fs: object,
log: object,
- loop: object,
- url: str,
- username: str,
- vca_config: dict,
on_update_db=None,
+ **kwargs,
):
"""Initialize N2VC abstract connector. It defines de API for VCA connectors
:param object fs: FileSystem object managing the package artifacts (repo common
FsBase)
:param object log: the logging object to log to
- :param object loop: the loop to use for asyncio (default current thread loop)
- :param str url: a string that how to connect to the VCA (if needed, IP and port
- can be obtained from there)
- :param str username: the username to authenticate with VCA
- :param dict vca_config: Additional parameters for the specific VCA. For example,
- for juju it will contain:
- secret: The password to authenticate with
- public_key: The contents of the juju public SSH key
- ca_cert str: The CA certificate used to authenticate
:param on_update_db: callback called when n2vc connector updates database.
Received arguments:
table: e.g. "nsrs"
if fs is None:
raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
- self.log.info(
- "url={}, username={}, vca_config={}".format(
- url,
- username,
- {
- k: v
- for k, v in vca_config.items()
- if k
- not in ("host", "port", "user", "secret", "public_key", "ca_cert")
- },
- )
- )
-
# store arguments into self
self.db = db
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
- self.url = url
- self.username = username
- self.vca_config = vca_config
self.on_update_db = on_update_db
# generate private/public key-pair
self.private_key_path = None
self.public_key_path = None
- self.get_public_key()
@abc.abstractmethod
async def get_status(self, namespace: str, yaml_format: bool = True):
# Find the path where we expect our key lives (~/.ssh)
homedir = os.environ.get("HOME")
if not homedir:
- self.warning("No HOME environment variable, using /tmp")
+ self.log.warning("No HOME environment variable, using /tmp")
homedir = "/tmp"
sshdir = "{}/.ssh".format(homedir)
+ sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir)))
if not os.path.exists(sshdir):
os.mkdir(sshdir)
self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
+ self.private_key_path = os.path.realpath(
+ os.path.normpath(os.path.abspath(self.private_key_path))
+ )
self.public_key_path = "{}.pub".format(self.private_key_path)
+ self.public_key_path = os.path.realpath(
+ os.path.normpath(os.path.abspath(self.public_key_path))
+ )
# If we don't have a key generated, then we have to generate it using ssh-keygen
if not os.path.exists(self.private_key_path):
- cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
- "rsa", "4096", self.private_key_path
+ command = "ssh-keygen -t {} -b {} -N '' -f {}".format(
+ "rsa", "4096", quote(self.private_key_path)
)
# run command with arguments
- subprocess.check_output(shlex.split(cmd))
+ args = shlex.split(command)
+ subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Read the public key. Only one public key (one line) in the file
with open(self.public_key_path, "r") as file:
reuse_ee_id: str = None,
progress_timeout: float = None,
total_timeout: float = None,
- ) -> (str, dict):
+ ) -> tuple[str, dict]:
"""Create an Execution Environment. Returns when it is created or raises an
exception on failing
:param float total_timeout:
"""
+ @abc.abstractmethod
+ async def install_k8s_proxy_charm(
+ self,
+ charm_name: str,
+ namespace: str,
+ artifact_path: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ ) -> str:
+ """
+ Install a k8s proxy charm
+
+ :param charm_name: Name of the charm being deployed
+ :param namespace: collection of all the uuids related to the charm.
+ :param str artifact_path: where to locate the artifacts (parent folder) using
+ the self.fs
+ the final artifact path will be a combination of this artifact_path and
+ additional string from the config_dict (e.g. charm name)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param float progress_timeout:
+ :param float total_timeout:
+ :param config: Dictionary with additional configuration
+
+ :returns ee_id: execution environment id.
+ """
+
@abc.abstractmethod
async def get_ee_ssh_public__key(
self,
# TODO
@abc.abstractmethod
async def remove_relation(self):
- """
- """
+ """ """
# TODO
@abc.abstractmethod
async def deregister_execution_environments(self):
- """
- """
+ """ """
@abc.abstractmethod
async def delete_namespace(
:param float total_timeout:
"""
+ @abc.abstractmethod
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+
@abc.abstractmethod
async def exec_primitive(
self,
####################################################################################
"""
- def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
+ def _get_namespace_components(
+ self, namespace: str
+ ) -> tuple[str, str, str, str, str]:
"""
Split namespace components
detailed_status: str,
vca_status: str,
entity_type: str,
+ vca_id: str = None,
):
+ """
+ Write application status to database
+
+ :param: db_dict: DB dictionary
+ :param: status: Status of the application
+ :param: detailed_status: Detailed status
+ :param: vca_status: VCA status
+ :param: entity_type: Entity type ("application", "machine, and "action")
+ :param: vca_id: Id of the VCA. If None, the default VCA will be used.
+ """
if not db_dict:
self.log.debug("No db_dict => No database write")
return
# .format(str(status.value), detailed_status, vca_status, entity_type))
try:
-
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
if self.on_update_db:
if asyncio.iscoroutinefunction(self.on_update_db):
await self.on_update_db(
- the_table, the_filter, the_path, update_dict
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
)
else:
- self.on_update_db(the_table, the_filter, the_path, update_dict)
+ self.on_update_db(
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
+ )
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
else:
self.log.info("Exception writing status to database: {}".format(e))
- def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
+ def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
if status not in JujuStatusToOSM[entity_type]:
- self.log.warning("Status {} not found in JujuStatusToOSM.")
+ self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
return N2VCDeploymentStatus.UNKNOWN
return JujuStatusToOSM[entity_type][status]
-# DEPRECATED
-def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
- if statustype == "application" or statustype == "unit":
- if status in ["waiting", "maintenance"]:
- return N2VCDeploymentStatus.RUNNING
- if status in ["error"]:
- return N2VCDeploymentStatus.FAILED
- elif status in ["active"]:
- return N2VCDeploymentStatus.COMPLETED
- elif status in ["blocked"]:
- return N2VCDeploymentStatus.RUNNING
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "action":
- if status in ["running"]:
- return N2VCDeploymentStatus.RUNNING
- elif status in ["completed"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "machine":
- if status in ["pending"]:
- return N2VCDeploymentStatus.PENDING
- elif status in ["started"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
-
- return N2VCDeploymentStatus.FAILED
-
-
def obj_to_yaml(obj: object) -> str:
# dump to yaml
dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
# convert obj to yaml
yaml_text = obj_to_yaml(obj)
# parse to dict
- return yaml.load(yaml_text, Loader=yaml.Loader)
+ return yaml.load(yaml_text, Loader=yaml.SafeLoader)