From: David Garcia Date: Mon, 12 Apr 2021 10:07:37 +0000 (+0200) Subject: Feature 10239: Distributed VCA X-Git-Tag: release-v10.0-start~3 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=eb8943a887e2fb8cce0240382811f9e504f3c7fb;p=osm%2FN2VC.git Feature 10239: Distributed VCA - Add vca_id in all calls that invoke libjuju. This is for being able to talk to the default VCA or the VCA associated to the VIM - Add store.py: Abstraction to talk to the database. - DBMongoStore: Use the db from common to talk to the database - MotorStore: Use motor, an asynchronous mongodb client to talk to the database - Add vca/connection.py: Represents the data needed to connect the VCA - Add EnvironConfig in config.py: Class to get the environment config, and avoid LCM from passing that Change-Id: I28625e0c56ce408114022c83d4b7cacbb649434c Signed-off-by: David Garcia --- diff --git a/n2vc/config.py b/n2vc/config.py index 59a74be..374ec73 100644 --- a/n2vc/config.py +++ b/n2vc/config.py @@ -12,6 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import typing + + +class EnvironConfig(dict): + prefixes = ["OSMLCM_VCA_", "OSMMON_VCA_"] + + def __init__(self, prefixes: typing.List[str] = None): + if prefixes: + self.prefixes = prefixes + for key, value in os.environ.items(): + if any(key.startswith(prefix) for prefix in self.prefixes): + self.__setitem__(self._get_renamed_key(key), value) + + def _get_renamed_key(self, key: str) -> str: + for prefix in self.prefixes: + key = key.replace(prefix, "") + return key.lower() + + MODEL_CONFIG_KEYS = [ "agent-metadata-url", "agent-stream", diff --git a/n2vc/juju_watcher.py b/n2vc/juju_watcher.py index 04ad10f..e206e06 100644 --- a/n2vc/juju_watcher.py +++ b/n2vc/juju_watcher.py @@ -72,7 +72,10 @@ def application_ready(application: Application) -> bool: class JujuModelWatcher: @staticmethod - async def wait_for_model(model: Model, timeout: float = 3600): + async def wait_for_model( + model: Model, + timeout: float = 3600 + ): """ Wait for all entities in model to reach its final state. @@ -121,6 +124,7 @@ class JujuModelWatcher: total_timeout: float = 3600, db_dict: dict = None, n2vc: N2VCConnector = None, + vca_id: str = None, ): """ Wait for entity to reach its final state. @@ -131,6 +135,7 @@ class JujuModelWatcher: :param: total_timeout: Timeout for the entity to be active :param: db_dict: Dictionary with data of the DB to write the updates :param: n2vc: N2VC Connector objector + :param: vca_id: VCA ID :raises: asyncio.TimeoutError when timeout reaches """ @@ -161,6 +166,7 @@ class JujuModelWatcher: timeout=progress_timeout, db_dict=db_dict, n2vc=n2vc, + vca_id=vca_id, ) ) @@ -182,6 +188,7 @@ class JujuModelWatcher: timeout: float, db_dict: dict = None, n2vc: N2VCConnector = None, + vca_id: str = None, ): """ Observes the changes related to an specific entity in a model @@ -192,6 +199,7 @@ class JujuModelWatcher: :param: timeout: Maximum time between two updates in the model :param: db_dict: Dictionary with data of the DB to write the updates :param: n2vc: N2VC Connector objector + :param: vca_id: VCA ID :raises: asyncio.TimeoutError when timeout reaches """ @@ -249,6 +257,7 @@ class JujuModelWatcher: detailed_status=status_message, vca_status=vca_status, entity_type=delta_entity, + vca_id=vca_id, ) # Check if timeout if time.time() > timeout_end: diff --git a/n2vc/k8s_helm3_conn.py b/n2vc/k8s_helm3_conn.py index 6afadbf..7d69168 100644 --- a/n2vc/k8s_helm3_conn.py +++ b/n2vc/k8s_helm3_conn.py @@ -78,7 +78,25 @@ class K8sHelm3Connector(K8sHelmBaseConnector): db_dict: dict = None, kdu_name: str = None, namespace: str = None, + **kwargs, ): + """Install a helm chart + + :param cluster_uuid str: The UUID of the cluster to install to + :param kdu_model str: The name or path of a bundle to install + :param kdu_instance: Kdu instance name + :param atomic bool: If set, waits until the model is active and resets + the cluster on failure. + :param timeout int: The time, in seconds, to wait for the install + to finish + :param params dict: Key-value pairs of instantiation parameters + :param kdu_name: Name of the KDU instance to be installed + :param namespace: K8s namespace to use for the KDU instance + + :param kwargs: Additional parameters (None yet) + + :return: True if successful + """ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index a79f038..0d001ee 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -100,7 +100,7 @@ class K8sHelmBaseConnector(K8sConnector): return namespace, cluster_id async def init_env( - self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None + self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Charts @@ -110,6 +110,7 @@ class K8sHelmBaseConnector(K8sConnector): :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse + :param kwargs: Additional parameters (None yet) :return: uuid of the K8s cluster and True if connector has installed some software in the cluster (on error, an exception will be raised) @@ -236,9 +237,18 @@ class K8sHelmBaseConnector(K8sConnector): self.fs.reverse_sync(from_path=cluster_id) async def reset( - self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False + self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs ) -> bool: + """Reset a cluster + Resets the Kubernetes cluster by removing the helm deployment that represents it. + + :param cluster_uuid: The UUID of the cluster to reset + :param force: Boolean to force the reset + :param uninstall_sw: Boolean to force the reset + :param kwargs: Additional parameters (None yet) + :return: Returns True if successful or raises an exception. + """ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}" .format(cluster_id, uninstall_sw)) @@ -569,7 +579,7 @@ class K8sHelmBaseConnector(K8sConnector): else: return 0 - async def uninstall(self, cluster_uuid: str, kdu_instance: str): + async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs): """ Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call (this call should happen after all _terminate-config-primitive_ of the VNF @@ -577,6 +587,7 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id :param kdu_instance: unique name for the KDU instance to be deleted + :param kwargs: Additional parameters (None yet) :return: True if successful """ @@ -648,6 +659,7 @@ class K8sHelmBaseConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + **kwargs, ) -> str: """Exec primitive (Juju action) @@ -657,6 +669,7 @@ class K8sHelmBaseConnector(K8sConnector): :param timeout: Timeout for action execution :param params: Dictionary of all the parameters needed for the action :db_dict: Dictionary for any additional data + :param kwargs: Additional parameters (None yet) :return: Returns the output of the action """ @@ -730,8 +743,30 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + """ + This call would retrieve tha current state of a given KDU instance. It would be + would allow to retrieve the _composition_ (i.e. K8s objects) and _specific + values_ of the configuration parameters applied to a given instance. This call + would be based on the `status` call. + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_instance: unique name for the KDU instance + :param kwargs: Additional parameters (None yet) + :return: If successful, it will return the following vector of arguments: + - K8s `namespace` in the cluster where the KDU lives + - `state` of the KDU instance. It can be: + - UNKNOWN + - DEPLOYED + - DELETED + - SUPERSEDED + - FAILED or + - DELETING + - List of `resources` (objects) that this release consists of, sorted by kind, + and the status of those resources + - Last `deployment_time`. + + """ self.log.debug( "status_kdu: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index ad230b5..d443f8d 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -99,7 +99,36 @@ class K8sHelmConnector(K8sHelmBaseConnector): db_dict: dict = None, kdu_name: str = None, namespace: str = None, + **kwargs, ): + """ + Deploys of a new KDU instance. It would implicitly rely on the `install` call + to deploy the Chart/Bundle properly parametrized (in practice, this call would + happen before any _initial-config-primitive_of the VNF is called). + + :param cluster_uuid: UUID of a K8s cluster known by OSM + :param kdu_model: chart/ reference (string), which can be either + of these options: + - a name of chart available via the repos known by OSM + - a path to a packaged chart + - a path to an unpacked chart directory or a URL + :param kdu_instance: Kdu instance name + :param atomic: If set, installation process purges chart/bundle on fail, also + will wait until all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters + (overriding default values) + :param dict db_dict: where to write into database when the status changes. + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :param kdu_name: Name of the KDU instance to be installed + :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters (None yet) + :return: True if successful + """ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id)) diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index 3130216..e3ec17e 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -20,15 +20,16 @@ import tempfile import binascii import base64 -from n2vc.config import ModelConfig -from n2vc.exceptions import K8sException, N2VCBadArgumentsException +from n2vc.config import EnvironConfig +from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT from .exceptions import MethodNotImplemented -from n2vc.utils import base64_to_cacert from n2vc.libjuju import Libjuju from n2vc.utils import obj_to_dict, obj_to_yaml - +from n2vc.store import MotorStore +from n2vc.vca.cloud import Cloud +from n2vc.vca.connection import get_connection from kubernetes.client.models import ( V1ClusterRole, V1ObjectMeta, @@ -54,6 +55,8 @@ def generate_rbac_id(): class K8sJujuConnector(K8sConnector): + libjuju = None + def __init__( self, fs: object, @@ -63,7 +66,6 @@ class K8sJujuConnector(K8sConnector): log: object = None, loop: object = None, on_update_db=None, - vca_config: dict = None, ): """ :param fs: file system for kubernetes and helm configuration @@ -86,35 +88,10 @@ class K8sJujuConnector(K8sConnector): self.loop = loop or asyncio.get_event_loop() self.log.debug("Initializing K8S Juju connector") - required_vca_config = [ - "host", - "user", - "secret", - "ca_cert", - ] - if not vca_config or not all(k in vca_config for k in required_vca_config): - raise N2VCBadArgumentsException( - message="Missing arguments in vca_config: {}".format(vca_config), - bad_args=required_vca_config, - ) - port = vca_config["port"] if "port" in vca_config else 17070 - url = "{}:{}".format(vca_config["host"], port) - model_config = ModelConfig(vca_config) - username = vca_config["user"] - secret = vca_config["secret"] - ca_cert = base64_to_cacert(vca_config["ca_cert"]) - - self.libjuju = Libjuju( - endpoint=url, - api_proxy=None, # Not needed for k8s charms - model_config=model_config, - username=username, - password=secret, - cacert=ca_cert, - loop=self.loop, - log=self.log, - db=self.db, - ) + db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri") + self._store = MotorStore(db_uri) + self.loading_libjuju = asyncio.Lock(loop=self.loop) + self.log.debug("K8S Juju connector initialized") # TODO: Remove these commented lines: # self.authenticated = False @@ -128,6 +105,7 @@ class K8sJujuConnector(K8sConnector): k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid: str = None, + **kwargs, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Juju bundles. @@ -137,10 +115,14 @@ class K8sJujuConnector(K8sConnector): :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: uuid of the K8s cluster and True if connector has installed some software in the cluster (on error, an exception will be raised) """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4()) @@ -199,7 +181,7 @@ class K8sJujuConnector(K8sConnector): ) default_storage_class = kubectl.get_default_storage_class() - await self.libjuju.add_k8s( + await libjuju.add_k8s( name=cluster_uuid, rbac_id=rbac_id, token=token, @@ -248,25 +230,34 @@ class K8sJujuConnector(K8sConnector): """Reset""" async def reset( - self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False + self, + cluster_uuid: str, + force: bool = False, + uninstall_sw: bool = False, + **kwargs, ) -> bool: """Reset a cluster Resets the Kubernetes cluster by removing the model that represents it. :param cluster_uuid str: The UUID of the cluster to reset + :param force: Force reset + :param uninstall_sw: Boolean to uninstall sw + :param: kwargs: Additional parameters + vca_id (str): VCA ID + :return: Returns True if successful or raises an exception. """ try: self.log.debug("[reset] Removing k8s cloud") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - cloud_creds = await self.libjuju.get_cloud_credentials( - cluster_uuid, - self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + + cloud_creds = await libjuju.get_cloud_credentials(cloud) - await self.libjuju.remove_cloud(cluster_uuid) + await libjuju.remove_cloud(cluster_uuid) kubecfg = self.get_credentials(cluster_uuid=cluster_uuid) @@ -310,6 +301,7 @@ class K8sJujuConnector(K8sConnector): db_dict: dict = None, kdu_name: str = None, namespace: str = None, + **kwargs, ) -> bool: """Install a bundle @@ -323,9 +315,12 @@ class K8sJujuConnector(K8sConnector): :param params dict: Key-value pairs of instantiation parameters :param kdu_name: Name of the KDU instance to be installed :param namespace: K8s namespace to use for the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: If successful, returns ? """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) bundle = kdu_model if not db_dict: @@ -347,11 +342,8 @@ class K8sJujuConnector(K8sConnector): # Create the new model self.log.debug("Adding model: {}".format(kdu_instance)) - await self.libjuju.add_model( - model_name=kdu_instance, - cloud_name=cluster_uuid, - credential_name=self._get_credential_name(cluster_uuid), - ) + cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid)) + await libjuju.add_model(kdu_instance, cloud) # if model: # TODO: Instantiation parameters @@ -370,12 +362,17 @@ class K8sJujuConnector(K8sConnector): previous_workdir = "/app/storage" self.log.debug("[install] deploying {}".format(bundle)) - await self.libjuju.deploy( + await libjuju.deploy( bundle, model_name=kdu_instance, wait=atomic, timeout=timeout ) os.chdir(previous_workdir) if self.on_update_db: - await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"]) + await self.on_update_db( + cluster_uuid, + kdu_instance, + filter=db_dict["filter"], + vca_id=kwargs.get("vca_id") + ) return True async def instances_list(self, cluster_uuid: str) -> list: @@ -442,18 +439,26 @@ class K8sJujuConnector(K8sConnector): """Deletion""" - async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool: + async def uninstall( + self, + cluster_uuid: str, + kdu_instance: str, + **kwargs, + ) -> bool: """Uninstall a KDU instance :param cluster_uuid str: The UUID of the cluster :param kdu_instance str: The unique name of the KDU instance + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns True if successful, or raises an exception """ self.log.debug("[uninstall] Destroying model") + libjuju = await self._get_libjuju(kwargs.get("vca_id")) - await self.libjuju.destroy_model(kdu_instance, total_timeout=3600) + await libjuju.destroy_model(kdu_instance, total_timeout=3600) # self.log.debug("[uninstall] Model destroyed and disconnecting") # await controller.disconnect() @@ -472,6 +477,7 @@ class K8sJujuConnector(K8sConnector): timeout: float = 300, params: dict = None, db_dict: dict = None, + **kwargs, ) -> str: """Exec primitive (Juju action) @@ -480,10 +486,13 @@ class K8sJujuConnector(K8sConnector): :param primitive_name: Name of action that will be executed :param timeout: Timeout for action execution :param params: Dictionary of all the parameters needed for the action - :db_dict: Dictionary for any additional data + :param db_dict: Dictionary for any additional data + :param kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns the output of the action """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) if not params or "application-name" not in params: raise K8sException( @@ -496,10 +505,10 @@ class K8sJujuConnector(K8sConnector): "kdu_instance: {}".format(kdu_instance) ) application_name = params["application-name"] - actions = await self.libjuju.get_actions(application_name, kdu_instance) + actions = await libjuju.get_actions(application_name, kdu_instance) if primitive_name not in actions: raise K8sException("Primitive {} not found".format(primitive_name)) - output, status = await self.libjuju.execute_action( + output, status = await libjuju.execute_action( application_name, kdu_instance, primitive_name, **params ) @@ -593,7 +602,8 @@ class K8sJujuConnector(K8sConnector): cluster_uuid: str, kdu_instance: str, complete_status: bool = False, - yaml_format: bool = False + yaml_format: bool = False, + **kwargs, ) -> dict: """Get the status of the KDU @@ -603,13 +613,16 @@ class K8sJujuConnector(K8sConnector): :param kdu_instance str: The unique id of the KDU instance :param complete_status: To get the complete_status of the KDU :param yaml_format: To get the status in proper format for NSR record + :param: kwargs: Additional parameters + vca_id (str): VCA ID :return: Returns a dictionary containing namespace, state, resources, and deployment_time and returns complete_status if complete_status is True """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) status = {} - model_status = await self.libjuju.get_model_status(kdu_instance) + model_status = await libjuju.get_model_status(kdu_instance) if not complete_status: for name in model_status.applications: @@ -623,28 +636,31 @@ class K8sJujuConnector(K8sConnector): return status - async def update_vca_status(self, vcastatus: dict, kdu_instance: str): + async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs): """ Add all configs, actions, executed actions of all applications in a model to vcastatus dict :param vcastatus dict: dict containing vcastatus :param kdu_instance str: The unique id of the KDU instance + :param: kwargs: Additional parameters + vca_id (str): VCA ID :return: None """ + libjuju = await self._get_libjuju(kwargs.get("vca_id")) try: for model_name in vcastatus: # Adding executed actions vcastatus[model_name]["executedActions"] = \ - await self.libjuju.get_executed_actions(kdu_instance) + await libjuju.get_executed_actions(kdu_instance) for application in vcastatus[model_name]["applications"]: # Adding application actions vcastatus[model_name]["applications"][application]["actions"] = \ - await self.libjuju.get_actions(application, kdu_instance) + await libjuju.get_actions(application, kdu_instance) # Adding application configs vcastatus[model_name]["applications"][application]["configs"] = \ - await self.libjuju.get_application_configs(kdu_instance, application) + await libjuju.get_application_configs(kdu_instance, application) except Exception as e: self.log.debug("Error in updating vca status: {}".format(str(e))) @@ -853,3 +869,28 @@ class K8sJujuConnector(K8sConnector): else: kdu_instance = db_dict["filter"]["_id"] return kdu_instance + + async def _get_libjuju(self, vca_id: str = None) -> Libjuju: + """ + Get libjuju object + + :param: vca_id: VCA ID + If None, get a libjuju object with a Connection to the default VCA + Else, geta libjuju object with a Connection to the specified VCA + """ + if not vca_id: + while self.loading_libjuju.locked(): + await asyncio.sleep(0.1) + if not self.libjuju: + async with self.loading_libjuju: + vca_connection = await get_connection(self._store) + self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log) + return self.libjuju + else: + vca_connection = await get_connection(self._store, vca_id) + return Libjuju( + vca_connection, + loop=self.loop, + log=self.log, + n2vc=self, + ) diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py index eb0fa72..a6fd8fe 100644 --- a/n2vc/libjuju.py +++ b/n2vc/libjuju.py @@ -14,6 +14,7 @@ import asyncio import logging +import typing import time @@ -32,7 +33,6 @@ from juju.controller import Controller from juju.client import client from juju import tag -from n2vc.config import ModelConfig from n2vc.juju_watcher import JujuModelWatcher from n2vc.provisioner import AsyncSSHProvisioner from n2vc.n2vc_conn import N2VCConnector @@ -44,11 +44,13 @@ from n2vc.exceptions import ( JujuControllerFailedConnecting, JujuApplicationExists, JujuInvalidK8sConfiguration, - JujuError + JujuError, ) -from n2vc.utils import DB_DATA -from osm_common.dbbase import DbException +from n2vc.vca.cloud import Cloud as VcaCloud +from n2vc.vca.connection import Connection from kubernetes.client.configuration import Configuration +from retrying_async import retry + RBAC_LABEL_KEY_NAME = "rbac-id" @@ -56,63 +58,35 @@ RBAC_LABEL_KEY_NAME = "rbac-id" class Libjuju: def __init__( self, - endpoint: str, - api_proxy: str, - username: str, - password: str, - cacert: str, + vca_connection: Connection, loop: asyncio.AbstractEventLoop = None, log: logging.Logger = None, - db: dict = None, n2vc: N2VCConnector = None, - model_config: ModelConfig = {}, ): """ Constructor - :param: endpoint: Endpoint of the juju controller (host:port) - :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs - :param: username: Juju username - :param: password: Juju password - :param: cacert: Juju CA Certificate + :param: vca_connection: n2vc.vca.connection object :param: loop: Asyncio loop :param: log: Logger - :param: db: DB object :param: n2vc: N2VC object - :param: apt_mirror: APT Mirror - :param: enable_os_upgrade: Enable OS Upgrade """ self.log = log or logging.getLogger("Libjuju") - self.db = db - db_endpoints = self._get_api_endpoints_db() - self.endpoints = None - if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints: - self.endpoints = [endpoint] - self._update_api_endpoints_db(self.endpoints) - else: - self.endpoints = db_endpoints - self.api_proxy = api_proxy - self.username = username - self.password = password - self.cacert = cacert - self.loop = loop or asyncio.get_event_loop() self.n2vc = n2vc + self.vca_connection = vca_connection - # Generate config for models - self.model_config = model_config - + self.loop = loop or asyncio.get_event_loop() self.loop.set_exception_handler(self.handle_exception) self.creating_model = asyncio.Lock(loop=self.loop) - self.log.debug("Libjuju initialized!") - - self.health_check_task = self._create_health_check_task() + if self.vca_connection.is_default: + self.health_check_task = self._create_health_check_task() def _create_health_check_task(self): return self.loop.create_task(self.health_check()) - async def get_controller(self, timeout: float = 15.0) -> Controller: + async def get_controller(self, timeout: float = 60.0) -> Controller: """ Get controller @@ -123,23 +97,27 @@ class Libjuju: controller = Controller(loop=self.loop) await asyncio.wait_for( controller.connect( - endpoint=self.endpoints, - username=self.username, - password=self.password, - cacert=self.cacert, + endpoint=self.vca_connection.data.endpoints, + username=self.vca_connection.data.user, + password=self.vca_connection.data.secret, + cacert=self.vca_connection.data.cacert, ), timeout=timeout, ) - endpoints = await controller.api_endpoints - if self.endpoints != endpoints: - self.endpoints = endpoints - self._update_api_endpoints_db(self.endpoints) + if self.vca_connection.is_default: + endpoints = await controller.api_endpoints + if not all( + endpoint in self.vca_connection.endpoints for endpoint in endpoints + ): + await self.vca_connection.update_endpoints(endpoints) return controller except asyncio.CancelledError as e: raise e except Exception as e: self.log.error( - "Failed connecting to controller: {}...".format(self.endpoints) + "Failed connecting to controller: {}... {}".format( + self.vca_connection.data.endpoints, e + ) ) if controller: await self.disconnect_controller(controller) @@ -168,14 +146,13 @@ class Libjuju: if controller: await controller.disconnect() - async def add_model(self, model_name: str, cloud_name: str, credential_name=None): + @retry(attempts=3, delay=5, timeout=None) + async def add_model(self, model_name: str, cloud: VcaCloud): """ Create model :param: model_name: Model name - :param: cloud_name: Cloud name - :param: credential_name: Credential name to use for adding the model - If not specified, same name as the cloud will be used. + :param: cloud: Cloud object """ # Get controller @@ -193,10 +170,15 @@ class Libjuju: self.log.debug("Creating model {}".format(model_name)) model = await controller.add_model( model_name, - config=self.model_config, - cloud_name=cloud_name, - credential_name=credential_name or cloud_name, + config=self.vca_connection.data.model_config, + cloud_name=cloud.name, + credential_name=cloud.credential_name, ) + except JujuAPIError as e: + if "already exists" in e.message: + pass + else: + raise e finally: if model: await self.disconnect_model(model) @@ -221,25 +203,35 @@ class Libjuju: actions.update(application_actions) # Get status of all actions for application_action in actions: - app_action_status_list = await model.get_action_status(name=application_action) + app_action_status_list = await model.get_action_status( + name=application_action + ) for action_id, action_status in app_action_status_list.items(): - executed_action = {"id": action_id, "action": application_action, - "status": action_status} + executed_action = { + "id": action_id, + "action": application_action, + "status": action_status, + } # Get action output by id action_status = await model.get_action_output(executed_action["id"]) for k, v in action_status.items(): executed_action[k] = v executed_actions.append(executed_action) except Exception as e: - raise JujuError("Error in getting executed actions for model: {}. Error: {}" - .format(model_name, str(e))) + raise JujuError( + "Error in getting executed actions for model: {}. Error: {}".format( + model_name, str(e) + ) + ) finally: if model: await self.disconnect_model(model) await self.disconnect_controller(controller) return executed_actions - async def get_application_configs(self, model_name: str, application_name: str) -> dict: + async def get_application_configs( + self, model_name: str, application_name: str + ) -> dict: """ Get available configs for an application. @@ -253,20 +245,24 @@ class Libjuju: controller = await self.get_controller() try: model = await self.get_model(controller, model_name) - application = self._get_application(model, application_name=application_name) + application = self._get_application( + model, application_name=application_name + ) application_configs = await application.get_config() except Exception as e: - raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}" - .format(application_name, model_name, str(e))) + raise JujuError( + "Error in getting configs for application: {} in model: {}. Error: {}".format( + application_name, model_name, str(e) + ) + ) finally: if model: await self.disconnect_model(model) await self.disconnect_controller(controller) return application_configs - async def get_model( - self, controller: Controller, model_name: str, id=None - ) -> Model: + @retry(attempts=3, delay=5) + async def get_model(self, controller: Controller, model_name: str) -> Model: """ Get model from controller @@ -277,9 +273,7 @@ class Libjuju: """ return await controller.get_model(model_name) - async def model_exists( - self, model_name: str, controller: Controller = None - ) -> bool: + async def model_exists(self, model_name: str, controller: Controller = None) -> bool: """ Check if model exists @@ -421,6 +415,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) finally: await self.disconnect_model(model) @@ -502,7 +497,7 @@ class Libjuju: connection=connection, nonce=params.nonce, machine_id=machine_id, - proxy=self.api_proxy, + proxy=self.vca_connection.data.api_proxy, series=params.series, ) ) @@ -533,6 +528,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) except Exception as e: raise e @@ -648,6 +644,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) self.log.debug( "Application {} is ready in model {}".format( @@ -720,19 +717,7 @@ class Libjuju: # because the leader elected hook has not been triggered yet. # Therefore, we are doing some retries. If it happens again, # re-open bug 1236 - attempts = 3 - time_between_retries = 10 - unit = None - for _ in range(attempts): - unit = await self._get_leader_unit(application) - if unit is None: - await asyncio.sleep(time_between_retries) - else: - break - if unit is None: - raise JujuLeaderUnitNotFound( - "Cannot execute action: leader unit not found" - ) + unit = await self._get_leader_unit(application) actions = await application.get_actions() @@ -755,6 +740,7 @@ class Libjuju: total_timeout=total_timeout, db_dict=db_dict, n2vc=self.n2vc, + vca_id=self.vca_connection._vca_id, ) output = await model.get_action_output(action_uuid=action.entity_id) @@ -1041,57 +1027,6 @@ class Libjuju: await self.disconnect_model(model) await self.disconnect_controller(controller) - def _get_api_endpoints_db(self) -> [str]: - """ - Get API Endpoints from DB - - :return: List of API endpoints - """ - self.log.debug("Getting endpoints from database") - - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - if juju_info and DB_DATA.api_endpoints.key in juju_info: - return juju_info[DB_DATA.api_endpoints.key] - - def _update_api_endpoints_db(self, endpoints: [str]): - """ - Update API endpoints in Database - - :param: List of endpoints - """ - self.log.debug("Saving endpoints {} in database".format(endpoints)) - - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - # If it doesn't, then create it - if not juju_info: - try: - self.db.create( - DB_DATA.api_endpoints.table, - DB_DATA.api_endpoints.filter, - ) - except DbException as e: - # Racing condition: check if another N2VC worker has created it - juju_info = self.db.get_one( - DB_DATA.api_endpoints.table, - q_filter=DB_DATA.api_endpoints.filter, - fail_on_empty=False, - ) - if not juju_info: - raise e - self.db.set_one( - DB_DATA.api_endpoints.table, - DB_DATA.api_endpoints.filter, - {DB_DATA.api_endpoints.key: endpoints}, - ) - def handle_exception(self, loop, context): # All unhandled exceptions by libjuju are handled here. pass @@ -1283,19 +1218,32 @@ class Libjuju: finally: await self.disconnect_controller(controller) + @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound()) async def _get_leader_unit(self, application: Application) -> Unit: unit = None for u in application.units: if await u.is_leader_from_status(): unit = u break + if not unit: + raise Exception() return unit - async def get_cloud_credentials(self, cloud_name: str, credential_name: str): + async def get_cloud_credentials(self, cloud: Cloud) -> typing.List: + """ + Get cloud credentials + + :param: cloud: Cloud object. The returned credentials will be from this cloud. + + :return: List of credentials object associated to the specified cloud + + """ controller = await self.get_controller() try: facade = client.CloudFacade.from_connection(controller.connection()) - cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name) + cloud_cred_tag = tag.credential( + cloud.name, self.vca_connection.data.user, cloud.credential_name + ) params = [client.Entity(cloud_cred_tag)] return (await facade.Credential(params)).results finally: diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index 12704a3..bfdf460 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -55,9 +55,6 @@ class N2VCConnector(abc.ABC, Loggable): fs: object, log: object, loop: object, - url: str, - username: str, - vca_config: dict, on_update_db=None, **kwargs, ): @@ -68,14 +65,6 @@ class N2VCConnector(abc.ABC, Loggable): FsBase) :param object log: the logging object to log to :param object loop: the loop to use for asyncio (default current thread loop) - :param str url: a string that how to connect to the VCA (if needed, IP and port - can be obtained from there) - :param str username: the username to authenticate with VCA - :param dict vca_config: Additional parameters for the specific VCA. For example, - for juju it will contain: - secret: The password to authenticate with - public_key: The contents of the juju public SSH key - ca_cert str: The CA certificate used to authenticate :param on_update_db: callback called when n2vc connector updates database. Received arguments: table: e.g. "nsrs" @@ -93,26 +82,10 @@ class N2VCConnector(abc.ABC, Loggable): if fs is None: raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"]) - self.log.info( - "url={}, username={}, vca_config={}".format( - url, - username, - { - k: v - for k, v in vca_config.items() - if k - not in ("host", "port", "user", "secret", "public_key", "ca_cert") - }, - ) - ) - # store arguments into self self.db = db self.fs = fs self.loop = loop or asyncio.get_event_loop() - self.url = url - self.username = username - self.vca_config = vca_config self.on_update_db = on_update_db # generate private/public key-pair @@ -443,7 +416,18 @@ class N2VCConnector(abc.ABC, Loggable): detailed_status: str, vca_status: str, entity_type: str, + vca_id: str = None, ): + """ + Write application status to database + + :param: db_dict: DB dictionary + :param: status: Status of the application + :param: detailed_status: Detailed status + :param: vca_status: VCA status + :param: entity_type: Entity type ("application", "machine, and "action") + :param: vca_id: Id of the VCA. If None, the default VCA will be used. + """ if not db_dict: self.log.debug("No db_dict => No database write") return @@ -477,10 +461,10 @@ class N2VCConnector(abc.ABC, Loggable): if self.on_update_db: if asyncio.iscoroutinefunction(self.on_update_db): await self.on_update_db( - the_table, the_filter, the_path, update_dict + the_table, the_filter, the_path, update_dict, vca_id=vca_id ) else: - self.on_update_db(the_table, the_filter, the_path, update_dict) + self.on_update_db(the_table, the_filter, the_path, update_dict, vca_id=vca_id) except DbException as e: if e.http_code == HTTPStatus.NOT_FOUND: diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index c09c263..99661f5 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -22,9 +22,8 @@ import asyncio import logging -import os -from n2vc.config import ModelConfig +from n2vc.config import EnvironConfig from n2vc.exceptions import ( N2VCBadArgumentsException, N2VCException, @@ -32,23 +31,24 @@ from n2vc.exceptions import ( N2VCExecutionException, # N2VCNotFound, MethodNotImplemented, - JujuK8sProxycharmNotSupported, ) from n2vc.n2vc_conn import N2VCConnector from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml from n2vc.libjuju import Libjuju -from n2vc.utils import base64_to_cacert +from n2vc.store import MotorStore +from n2vc.vca.connection import get_connection class N2VCJujuConnector(N2VCConnectorlocalhost", "microk8s"] + libjuju = None def __init__( self, @@ -56,12 +56,16 @@ class N2VCJujuConnector(N2VCConnector): fs: object, log: object = None, loop: object = None, - url: str = "127.0.0.1:17070", - username: str = "admin", - vca_config: dict = None, on_update_db=None, ): - """Initialize juju N2VC connector + """ + Constructor + + :param: db: Database object from osm_common + :param: fs: Filesystem object from osm_common + :param: log: Logger + :param: loop: Asyncio loop + :param: on_update_db: Callback function to be called for updating the database. """ # parent class constructor @@ -71,9 +75,6 @@ class N2VCJujuConnector(N2VCConnector): fs=fs, log=log, loop=loop, - url=url, - username=username, - vca_config=vca_config, on_update_db=on_update_db, ) @@ -84,113 +85,29 @@ class N2VCJujuConnector(N2VCConnector): self.log.info("Initializing N2VC juju connector...") - """ - ############################################################## - # check arguments - ############################################################## - """ - - # juju URL - if url is None: - raise N2VCBadArgumentsException("Argument url is mandatory", ["url"]) - url_parts = url.split(":") - if len(url_parts) != 2: - raise N2VCBadArgumentsException( - "Argument url: bad format (localhost:port) -> {}".format(url), ["url"] - ) - self.hostname = url_parts[0] - try: - self.port = int(url_parts[1]) - except ValueError: - raise N2VCBadArgumentsException( - "url port must be a number -> {}".format(url), ["url"] - ) - - # juju USERNAME - if username is None: - raise N2VCBadArgumentsException( - "Argument username is mandatory", ["username"] - ) - - # juju CONFIGURATION - if vca_config is None: - raise N2VCBadArgumentsException( - "Argument vca_config is mandatory", ["vca_config"] - ) - - if "secret" in vca_config: - self.secret = vca_config["secret"] - else: - raise N2VCBadArgumentsException( - "Argument vca_config.secret is mandatory", ["vca_config.secret"] - ) - - # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub - # if exists, it will be written in lcm container: _create_juju_public_key() - if "public_key" in vca_config: - self.public_key = vca_config["public_key"] - else: - self.public_key = None - - # TODO: Verify ca_cert is valid before using. VCA will crash - # if the ca_cert isn't formatted correctly. - - self.ca_cert = vca_config.get("ca_cert") - if self.ca_cert: - self.ca_cert = base64_to_cacert(vca_config["ca_cert"]) - - if "api_proxy" in vca_config and vca_config["api_proxy"] != "": - self.api_proxy = vca_config["api_proxy"] - self.log.debug( - "api_proxy for native charms configured: {}".format(self.api_proxy) - ) - else: - self.warning( - "api_proxy is not configured" - ) - self.api_proxy = None - - model_config = ModelConfig(vca_config) - - self.cloud = vca_config.get('cloud') - self.k8s_cloud = None - if "k8s_cloud" in vca_config: - self.k8s_cloud = vca_config.get("k8s_cloud") - self.log.debug('Arguments have been checked') - - # juju data - self.controller = None # it will be filled when connect to juju - self.juju_models = {} # model objects for every model_name - self.juju_observers = {} # model observers for every model_name - self._connecting = ( - False # while connecting to juju (to avoid duplicate connections) - ) - self._authenticated = ( - False # it will be True when juju connection be stablished - ) - self._creating_model = False # True during model creation - self.libjuju = Libjuju( - endpoint=self.url, - api_proxy=self.api_proxy, - username=self.username, - password=self.secret, - cacert=self.ca_cert, - loop=self.loop, - log=self.log, - db=self.db, - n2vc=self, - model_config=model_config, - ) - - # create juju pub key file in lcm container at - # ./local/share/juju/ssh/juju_id_rsa.pub - self._create_juju_public_key() + db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri") + self._store = MotorStore(db_uri) + self.loading_libjuju = asyncio.Lock(loop=self.loop) self.log.info("N2VC juju connector initialized") - async def get_status(self, namespace: str, yaml_format: bool = True): + async def get_status( + self, namespace: str, yaml_format: bool = True, vca_id: str = None + ): + """ + Get status from all juju models from a VCA + + :param namespace: we obtain ns from namespace + :param yaml_format: returns a yaml string + :param: vca_id: VCA ID from which the status will be retrieved. + """ + # TODO: Review where is this function used. It is not optimal at all to get the status + # from all the juju models of a particular VCA. Additionally, these models might + # not have been deployed by OSM, in that case we are getting information from + # deployments outside of OSM's scope. # self.log.info('Getting NS status. namespace: {}'.format(namespace)) + libjuju = await self._get_libjuju(vca_id) _nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components( namespace=namespace @@ -203,33 +120,38 @@ class N2VCJujuConnector(N2VCConnector): raise N2VCBadArgumentsException(msg, ["namespace"]) status = {} - models = await self.libjuju.list_models(contains=ns_id) + models = await libjuju.list_models(contains=ns_id) for m in models: - status[m] = await self.libjuju.get_model_status(m) + status[m] = await libjuju.get_model_status(m) + if yaml_format: return obj_to_yaml(status) else: return obj_to_dict(status) - async def update_vca_status(self, vcastatus: dict): + async def update_vca_status(self, vcastatus: dict, vca_id: str = None): """ Add all configs, actions, executed actions of all applications in a model to vcastatus dict. + :param vcastatus: dict containing vcaStatus + :param: vca_id: VCA ID + :return: None """ try: + libjuju = await self._get_libjuju(vca_id) for model_name in vcastatus: # Adding executed actions vcastatus[model_name]["executedActions"] = \ - await self.libjuju.get_executed_actions(model_name) + await libjuju.get_executed_actions(model_name) for application in vcastatus[model_name]["applications"]: # Adding application actions vcastatus[model_name]["applications"][application]["actions"] = \ - await self.libjuju.get_actions(application, model_name) + await libjuju.get_actions(application, model_name) # Adding application configs vcastatus[model_name]["applications"][application]["configs"] = \ - await self.libjuju.get_application_configs(model_name, application) + await libjuju.get_application_configs(model_name, application) except Exception as e: self.log.debug("Error in updating vca status: {}".format(str(e))) @@ -240,15 +162,34 @@ class N2VCJujuConnector(N2VCConnector): reuse_ee_id: str = None, progress_timeout: float = None, total_timeout: float = None, - cloud_name: str = None, - credential_name: str = None, + vca_id: str = None, ) -> (str, dict): + """ + Create an Execution Environment. Returns when it is created or raises an + exception on failing + + :param: namespace: Contains a dot separate string. + LCM will use: []...[-] + :param: db_dict: where to write to database when the status changes. + It contains a dictionary with {collection: str, filter: {}, path: str}, + e.g. {collection: "nsrs", filter: {_id: , path: + "_admin.deployed.VCA.3"} + :param: reuse_ee_id: ee id from an older execution. It allows us to reuse an + older environment + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout + :param: vca_id: VCA ID + + :returns: id of the new execution environment and credentials for it + (credentials can contains hostname, username, etc depending on underlying cloud) + """ self.log.info( "Creating execution environment. namespace: {}, reuse_ee_id: {}".format( namespace, reuse_ee_id ) ) + libjuju = await self._get_libjuju(vca_id) machine_id = None if reuse_ee_id: @@ -276,15 +217,12 @@ class N2VCJujuConnector(N2VCConnector): # create or reuse a new juju machine try: - if not await self.libjuju.model_exists(model_name): - cloud = cloud_name or self.cloud - credential = credential_name or cloud_name if cloud_name else self.cloud - await self.libjuju.add_model( + if not await libjuju.model_exists(model_name): + await libjuju.add_model( model_name, - cloud_name=cloud, - credential_name=credential + libjuju.vca_connection.lxd_cloud, ) - machine, new = await self.libjuju.create_machine( + machine, new = await libjuju.create_machine( model_name=model_name, machine_id=machine_id, db_dict=db_dict, @@ -328,15 +266,34 @@ class N2VCJujuConnector(N2VCConnector): db_dict: dict, progress_timeout: float = None, total_timeout: float = None, - cloud_name: str = None, - credential_name: str = None, + vca_id: str = None, ) -> str: - + """ + Register an existing execution environment at the VCA + + :param: namespace: Contains a dot separate string. + LCM will use: []...[-] + :param: credentials: credentials to access the existing execution environment + (it can contains hostname, username, path to private key, + etc depending on underlying cloud) + :param: db_dict: where to write to database when the status changes. + It contains a dictionary with {collection: str, filter: {}, path: str}, + e.g. {collection: "nsrs", filter: {_id: , path: + "_admin.deployed.VCA.3"} + :param: reuse_ee_id: ee id from an older execution. It allows us to reuse an + older environment + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout + :param: vca_id: VCA ID + + :returns: id of the execution environment + """ self.log.info( "Registering execution environment. namespace={}, credentials={}".format( namespace, credentials ) ) + libjuju = await self._get_libjuju(vca_id) if credentials is None: raise N2VCBadArgumentsException( @@ -371,15 +328,12 @@ class N2VCJujuConnector(N2VCConnector): # register machine on juju try: - if not await self.libjuju.model_exists(model_name): - cloud = cloud_name or self.cloud - credential = credential_name or cloud_name if cloud_name else self.cloud - await self.libjuju.add_model( + if not await libjuju.model_exists(model_name): + await libjuju.add_model( model_name, - cloud_name=cloud, - credential_name=credential + libjuju.vca_connection.lxd_cloud, ) - machine_id = await self.libjuju.provision_machine( + machine_id = await libjuju.provision_machine( model_name=model_name, hostname=hostname, username=username, @@ -416,7 +370,29 @@ class N2VCJujuConnector(N2VCConnector): total_timeout: float = None, config: dict = None, num_units: int = 1, + vca_id: str = None, ): + """ + Install the software inside the execution environment identified by ee_id + + :param: ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment + :param: artifact_path: where to locate the artifacts (parent folder) using + the self.fs + the final artifact path will be a combination of this + artifact_path and additional string from the config_dict + (e.g. charm name) + :param: db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout + :param: config: Dictionary with deployment config information. + :param: num_units: Number of units to deploy of a particular charm. + :param: vca_id: VCA ID + """ self.log.info( ( @@ -424,6 +400,7 @@ class N2VCJujuConnector(N2VCConnector): "artifact path: {}, db_dict: {}" ).format(ee_id, artifact_path, db_dict) ) + libjuju = await self._get_libjuju(vca_id) # check arguments if ee_id is None or len(ee_id) == 0: @@ -473,7 +450,7 @@ class N2VCJujuConnector(N2VCConnector): full_path = self.fs.path + "/" + artifact_path try: - await self.libjuju.deploy_charm( + await libjuju.deploy_charm( model_name=model_name, application_name=application_name, path=full_path, @@ -500,8 +477,7 @@ class N2VCJujuConnector(N2VCConnector): progress_timeout: float = None, total_timeout: float = None, config: dict = None, - cloud_name: str = None, - credential_name: str = None, + vca_id: str = None, ) -> str: """ Install a k8s proxy charm @@ -517,56 +493,54 @@ class N2VCJujuConnector(N2VCConnector): {collection: , filter: {}, path: }, e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} - :param float progress_timeout: - :param float total_timeout: + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout :param config: Dictionary with additional configuration - :param cloud_name: Cloud Name in which the charms will be deployed - :param credential_name: Credential Name to use in the cloud_name. - If not set, cloud_name will be used as credential_name + :param vca_id: VCA ID :returns ee_id: execution environment id. """ - self.log.info('Installing k8s proxy charm: {}, artifact path: {}, db_dict: {}' - .format(charm_name, artifact_path, db_dict)) - - if not self.k8s_cloud: - raise JujuK8sProxycharmNotSupported("There is not k8s_cloud available") + self.log.info( + "Installing k8s proxy charm: {}, artifact path: {}, db_dict: {}".format( + charm_name, artifact_path, db_dict + ) + ) + libjuju = await self._get_libjuju(vca_id) if artifact_path is None or len(artifact_path) == 0: raise N2VCBadArgumentsException( message="artifact_path is mandatory", bad_args=["artifact_path"] ) if db_dict is None: - raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict']) + raise N2VCBadArgumentsException( + message="db_dict is mandatory", bad_args=["db_dict"] + ) # remove // in charm path - while artifact_path.find('//') >= 0: - artifact_path = artifact_path.replace('//', '/') + while artifact_path.find("//") >= 0: + artifact_path = artifact_path.replace("//", "/") # check charm path if not self.fs.file_exists(artifact_path, mode="dir"): - msg = 'artifact path does not exist: {}'.format(artifact_path) - raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path']) + msg = "artifact path does not exist: {}".format(artifact_path) + raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"]) - if artifact_path.startswith('/'): + if artifact_path.startswith("/"): full_path = self.fs.path + artifact_path else: - full_path = self.fs.path + '/' + artifact_path + full_path = self.fs.path + "/" + artifact_path _, ns_id, _, _, _ = self._get_namespace_components(namespace=namespace) - model_name = '{}-k8s'.format(ns_id) - if not await self.libjuju.model_exists(model_name): - cloud = cloud_name or self.k8s_cloud - credential = credential_name or cloud_name if cloud_name else self.k8s_cloud - await self.libjuju.add_model( + model_name = "{}-k8s".format(ns_id) + if not await libjuju.model_exists(model_name): + await libjuju.add_model( model_name, - cloud_name=cloud, - credential_name=credential + libjuju.vca_connection.k8s_cloud, ) application_name = self._get_application_name(namespace) try: - await self.libjuju.deploy_charm( + await libjuju.deploy_charm( model_name=model_name, application_name=application_name, path=full_path, @@ -574,12 +548,12 @@ class N2VCJujuConnector(N2VCConnector): db_dict=db_dict, progress_timeout=progress_timeout, total_timeout=total_timeout, - config=config + config=config, ) except Exception as e: - raise N2VCException(message='Error deploying charm: {}'.format(e)) + raise N2VCException(message="Error deploying charm: {}".format(e)) - self.log.info('K8s proxy charm installed') + self.log.info("K8s proxy charm installed") ee_id = N2VCJujuConnector._build_ee_id( model_name=model_name, application_name=application_name, @@ -596,13 +570,33 @@ class N2VCJujuConnector(N2VCConnector): db_dict: dict, progress_timeout: float = None, total_timeout: float = None, + vca_id: str = None, ) -> str: + """ + Get Execution environment ssh public key + + :param: ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment + :param: db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout + :param vca_id: VCA ID + :returns: public key of the execution environment + For the case of juju proxy charm ssh-layered, it is the one + returned by 'get-ssh-public-key' primitive. + It raises a N2VC exception if fails + """ self.log.info( ( "Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}" ).format(ee_id, db_dict) ) + libjuju = await self._get_libjuju(vca_id) # check arguments if ee_id is None or len(ee_id) == 0: @@ -643,7 +637,7 @@ class N2VCJujuConnector(N2VCConnector): # execute action: generate-ssh-key try: - output, _status = await self.libjuju.execute_action( + output, _status = await libjuju.execute_action( model_name=model_name, application_name=application_name, action_name="generate-ssh-key", @@ -660,7 +654,7 @@ class N2VCJujuConnector(N2VCConnector): # execute action: get-ssh-public-key try: - output, _status = await self.libjuju.execute_action( + output, _status = await libjuju.execute_action( model_name=model_name, application_name=application_name, action_name="get-ssh-public-key", @@ -676,18 +670,44 @@ class N2VCJujuConnector(N2VCConnector): # return public key if exists return output["pubkey"] if "pubkey" in output else output - async def get_metrics(self, model_name: str, application_name: str) -> dict: - return await self.libjuju.get_metrics(model_name, application_name) + async def get_metrics( + self, model_name: str, application_name: str, vca_id: str = None + ) -> dict: + """ + Get metrics from application + + :param: model_name: Model name + :param: application_name: Application name + :param: vca_id: VCA ID + + :return: Dictionary with obtained metrics + """ + libjuju = await self._get_libjuju(vca_id) + return await libjuju.get_metrics(model_name, application_name) async def add_relation( - self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str + self, + ee_id_1: str, + ee_id_2: str, + endpoint_1: str, + endpoint_2: str, + vca_id: str = None, ): + """ + Add relation between two charmed endpoints + :param: ee_id_1: The id of the first execution environment + :param: ee_id_2: The id of the second execution environment + :param: endpoint_1: The endpoint in the first execution environment + :param: endpoint_2: The endpoint in the second execution environment + :param: vca_id: VCA ID + """ self.log.debug( "adding new relation between {} and {}, endpoints: {}, {}".format( ee_id_1, ee_id_2, endpoint_1, endpoint_2 ) ) + libjuju = await self._get_libjuju(vca_id) # check arguments if not ee_id_1: @@ -721,7 +741,7 @@ class N2VCJujuConnector(N2VCConnector): # add juju relations between two applications try: - await self.libjuju.add_relation( + await libjuju.add_relation( model_name=model_1, endpoint_1="{}:{}".format(app_1, endpoint_1), endpoint_2="{}:{}".format(app_2, endpoint_2), @@ -743,9 +763,25 @@ class N2VCJujuConnector(N2VCConnector): raise MethodNotImplemented() async def delete_namespace( - self, namespace: str, db_dict: dict = None, total_timeout: float = None + self, + namespace: str, + db_dict: dict = None, + total_timeout: float = None, + vca_id: str = None, ): + """ + Remove a network scenario and its execution environments + :param: namespace: []. + :param: db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param: total_timeout: Total timeout + :param: vca_id: VCA ID + """ self.log.info("Deleting namespace={}".format(namespace)) + libjuju = await self._get_libjuju(vca_id) # check arguments if namespace is None: @@ -758,9 +794,9 @@ class N2VCJujuConnector(N2VCConnector): ) if ns_id is not None: try: - models = await self.libjuju.list_models(contains=ns_id) + models = await libjuju.list_models(contains=ns_id) for model in models: - await self.libjuju.destroy_model( + await libjuju.destroy_model( model_name=model, total_timeout=total_timeout ) except Exception as e: @@ -775,10 +811,27 @@ class N2VCJujuConnector(N2VCConnector): self.log.info("Namespace {} deleted".format(namespace)) async def delete_execution_environment( - self, ee_id: str, db_dict: dict = None, total_timeout: float = None, - scaling_in: bool = False + self, + ee_id: str, + db_dict: dict = None, + total_timeout: float = None, + scaling_in: bool = False, + vca_id: str = None, ): + """ + Delete an execution environment + :param str ee_id: id of the execution environment to delete + :param dict db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param: total_timeout: Total timeout + :param: scaling_in: Boolean to indicate if is it a scaling in operation + :param: vca_id: VCA ID + """ self.log.info("Deleting execution environment ee_id={}".format(ee_id)) + libjuju = await self._get_libjuju(vca_id) # check arguments if ee_id is None: @@ -793,13 +846,13 @@ class N2VCJujuConnector(N2VCConnector): if not scaling_in: # destroy the model # TODO: should this be removed? - await self.libjuju.destroy_model( + await libjuju.destroy_model( model_name=model_name, total_timeout=total_timeout, ) else: # destroy the application - await self.libjuju.destroy_application( + await libjuju.destroy_application( model_name=model_name, application_name=application_name, total_timeout=total_timeout, @@ -821,13 +874,34 @@ class N2VCJujuConnector(N2VCConnector): db_dict: dict = None, progress_timeout: float = None, total_timeout: float = None, + vca_id: str = None, ) -> str: + """ + Execute a primitive in the execution environment + + :param: ee_id: the one returned by create_execution_environment or + register_execution_environment + :param: primitive_name: must be one defined in the software. There is one + called 'config', where, for the proxy case, the 'credentials' of VM are + provided + :param: params_dict: parameters of the action + :param: db_dict: where to write into database when the status changes. + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} + :param: progress_timeout: Progress timeout + :param: total_timeout: Total timeout + :param: vca_id: VCA ID + :returns str: primitive result, if ok. It raises exceptions in case of fail + """ self.log.info( "Executing primitive: {} on ee: {}, params: {}".format( primitive_name, ee_id, params_dict ) ) + libjuju = await self._get_libjuju(vca_id) # check arguments if ee_id is None or len(ee_id) == 0: @@ -858,13 +932,14 @@ class N2VCJujuConnector(N2VCConnector): if primitive_name == "config": # Special case: config primitive try: - await self.libjuju.configure_application( + await libjuju.configure_application( model_name=model_name, application_name=application_name, config=params_dict, ) - actions = await self.libjuju.get_actions( - application_name=application_name, model_name=model_name, + actions = await libjuju.get_actions( + application_name=application_name, + model_name=model_name, ) self.log.debug( "Application {} has these actions: {}".format( @@ -878,7 +953,7 @@ class N2VCJujuConnector(N2VCConnector): for _ in range(num_retries): try: self.log.debug("Executing action verify-ssh-credentials...") - output, ok = await self.libjuju.execute_action( + output, ok = await libjuju.execute_action( model_name=model_name, application_name=application_name, action_name="verify-ssh-credentials", @@ -920,7 +995,7 @@ class N2VCJujuConnector(N2VCConnector): return "CONFIG OK" else: try: - output, status = await self.libjuju.execute_action( + output, status = await libjuju.execute_action( model_name=model_name, application_name=application_name, action_name=primitive_name, @@ -944,13 +1019,20 @@ class N2VCJujuConnector(N2VCConnector): primitive_name=primitive_name, ) - async def disconnect(self): + async def disconnect(self, vca_id: str = None): + """ + Disconnect from VCA + + :param: vca_id: VCA ID + """ self.log.info("closing juju N2VC...") + libjuju = await self._get_libjuju(vca_id) try: - await self.libjuju.disconnect() + await libjuju.disconnect() except Exception as e: raise N2VCConnectionException( - message="Error disconnecting controller: {}".format(e), url=self.url + message="Error disconnecting controller: {}".format(e), + url=libjuju.vca_connection.data.endpoints, ) """ @@ -959,6 +1041,31 @@ class N2VCJujuConnector(N2VCConnector): #################################################################################### """ + async def _get_libjuju(self, vca_id: str = None) -> Libjuju: + """ + Get libjuju object + + :param: vca_id: VCA ID + If None, get a libjuju object with a Connection to the default VCA + Else, geta libjuju object with a Connection to the specified VCA + """ + if not vca_id: + while self.loading_libjuju.locked(): + await asyncio.sleep(0.1) + if not self.libjuju: + async with self.loading_libjuju: + vca_connection = await get_connection(self._store) + self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log) + return self.libjuju + else: + vca_connection = await get_connection(self._store, vca_id) + return Libjuju( + vca_connection, + loop=self.loop, + log=self.log, + n2vc=self, + ) + def _write_ee_id_db(self, db_dict: dict, ee_id: str): # write ee_id to database: _admin.deployed.VCA.x @@ -1046,38 +1153,6 @@ class N2VCJujuConnector(N2VCConnector): return N2VCJujuConnector._format_app_name(application_name) - def _create_juju_public_key(self): - """Recreate the Juju public key on lcm container, if needed - Certain libjuju commands expect to be run from the same machine as Juju - is bootstrapped to. This method will write the public key to disk in - that location: ~/.local/share/juju/ssh/juju_id_rsa.pub - """ - - # Make sure that we have a public key before writing to disk - if self.public_key is None or len(self.public_key) == 0: - if "OSMLCM_VCA_PUBKEY" in os.environ: - self.public_key = os.getenv("OSMLCM_VCA_PUBKEY", "") - if len(self.public_key) == 0: - return - else: - return - - pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~")) - file_path = "{}/juju_id_rsa.pub".format(pk_path) - self.log.debug( - "writing juju public key to file:\n{}\npublic key: {}".format( - file_path, self.public_key - ) - ) - if not os.path.exists(pk_path): - # create path and write file - os.makedirs(pk_path) - with open(file_path, "w") as f: - self.log.debug("Creating juju public key file: {}".format(file_path)) - f.write(self.public_key) - else: - self.log.debug("juju public key file already exists: {}".format(file_path)) - @staticmethod def _format_model_name(name: str) -> str: """Format the name of the model. @@ -1127,3 +1202,14 @@ class N2VCJujuConnector(N2VCConnector): app_name = "z" + app_name return app_name + + async def validate_vca(self, vca_id: str): + """ + Validate a VCA by connecting/disconnecting to/from it + + :param: vca_id: VCA ID + """ + vca_connection = await get_connection(self._store, vca_id=vca_id) + libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self) + controller = await libjuju.get_controller() + await libjuju.disconnect_controller(controller) diff --git a/n2vc/store.py b/n2vc/store.py new file mode 100644 index 0000000..b827d51 --- /dev/null +++ b/n2vc/store.py @@ -0,0 +1,390 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import asyncio +from base64 import b64decode +import re +import typing + +from Crypto.Cipher import AES +from motor.motor_asyncio import AsyncIOMotorClient +from n2vc.config import EnvironConfig +from n2vc.vca.connection_data import ConnectionData +from osm_common.dbmongo import DbMongo, DbException + +DB_NAME = "osm" + + +class Store(abc.ABC): + @abc.abstractmethod + async def get_vca_connection_data(self, vca_id: str) -> ConnectionData: + """ + Get VCA connection data + + :param: vca_id: VCA ID + + :returns: ConnectionData with the information of the database + """ + + @abc.abstractmethod + async def update_vca_endpoints(self, hosts: typing.List[str], vca_id: str): + """ + Update VCA endpoints + + :param: endpoints: List of endpoints to write in the database + :param: vca_id: VCA ID + """ + + @abc.abstractmethod + async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]: + """ + Get list if VCA endpoints + + :param: vca_id: VCA ID + + :returns: List of endpoints + """ + + @abc.abstractmethod + async def get_vca_id(self, vim_id: str = None) -> str: + """ + Get VCA id for a VIM account + + :param: vim_id: Vim account ID + """ + + +class DbMongoStore(Store): + def __init__(self, db: DbMongo): + """ + Constructor + + :param: db: osm_common.dbmongo.DbMongo object + """ + self.db = db + + async def get_vca_connection_data(self, vca_id: str) -> ConnectionData: + """ + Get VCA connection data + + :param: vca_id: VCA ID + + :returns: ConnectionData with the information of the database + """ + data = self.db.get_one("vca", q_filter={"_id": vca_id}) + self.db.encrypt_decrypt_fields( + data, + "decrypt", + ["secret", "cacert"], + schema_version=data["schema_version"], + salt=data["_id"], + ) + return ConnectionData(**data) + + async def update_vca_endpoints( + self, endpoints: typing.List[str], vca_id: str = None + ): + """ + Update VCA endpoints + + :param: endpoints: List of endpoints to write in the database + :param: vca_id: VCA ID + """ + if vca_id: + data = self.db.get_one("vca", q_filter={"_id": vca_id}) + data["endpoints"] = endpoints + self._update("vca", vca_id, data) + else: + # The default VCA. Data for the endpoints is in a different place + juju_info = self._get_juju_info() + # If it doesn't, then create it + if not juju_info: + try: + self.db.create( + "vca", + {"_id": "juju"}, + ) + except DbException as e: + # Racing condition: check if another N2VC worker has created it + juju_info = self._get_juju_info() + if not juju_info: + raise e + self.db.set_one( + "vca", + {"_id": "juju"}, + {"api_endpoints": endpoints}, + ) + + async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]: + """ + Get list if VCA endpoints + + :param: vca_id: VCA ID + + :returns: List of endpoints + """ + endpoints = [] + if vca_id: + endpoints = self.get_vca_connection_data(vca_id).endpoints + else: + juju_info = self._get_juju_info() + if juju_info and "api_endpoints" in juju_info: + endpoints = juju_info["api_endpoints"] + return endpoints + + async def get_vca_id(self, vim_id: str = None) -> str: + """ + Get VCA ID from the database for a given VIM account ID + + :param: vim_id: VIM account ID + """ + return ( + self.db.get_one( + "vim_accounts", + q_filter={"_id": vim_id}, + fail_on_empty=False, + ).get("vca") + if vim_id + else None + ) + + def _update(self, collection: str, id: str, data: dict): + """ + Update object in database + + :param: collection: Collection name + :param: id: ID of the object + :param: data: Object data + """ + self.db.replace( + collection, + id, + data, + ) + + def _get_juju_info(self): + """Get Juju information (the default VCA) from the admin collection""" + return self.db.get_one( + "vca", + q_filter={"_id": "juju"}, + fail_on_empty=False, + ) + + +class MotorStore(Store): + def __init__(self, uri: str, loop=None): + """ + Constructor + + :param: uri: Connection string to connect to the database. + :param: loop: Asyncio Loop + """ + self._client = AsyncIOMotorClient(uri) + self.loop = loop or asyncio.get_event_loop() + self._secret_key = None + self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]) + + @property + def _database(self): + return self._client[DB_NAME] + + @property + def _vca_collection(self): + return self._database["vca"] + + @property + def _admin_collection(self): + return self._database["admin"] + + @property + def _vim_accounts_collection(self): + return self._database["vim_accounts"] + + async def get_vca_connection_data(self, vca_id: str) -> ConnectionData: + """ + Get VCA connection data + + :param: vca_id: VCA ID + + :returns: ConnectionData with the information of the database + """ + data = await self._vca_collection.find_one({"_id": vca_id}) + if not data: + raise Exception("vca with id {} not found".format(vca_id)) + await self.decrypt_fields( + data, + ["secret", "cacert"], + schema_version=data["schema_version"], + salt=data["_id"], + ) + return ConnectionData(**data) + + async def update_vca_endpoints( + self, endpoints: typing.List[str], vca_id: str = None + ): + """ + Update VCA endpoints + + :param: endpoints: List of endpoints to write in the database + :param: vca_id: VCA ID + """ + if vca_id: + data = await self._vca_collection.find_one({"_id": vca_id}) + data["endpoints"] = endpoints + await self._vca_collection.replace_one({"_id": vca_id}, data) + else: + # The default VCA. Data for the endpoints is in a different place + juju_info = await self._get_juju_info() + # If it doesn't, then create it + if not juju_info: + try: + await self._admin_collection.insert_one({"_id": "juju"}) + except Exception as e: + # Racing condition: check if another N2VC worker has created it + juju_info = await self._get_juju_info() + if not juju_info: + raise e + + await self._admin_collection.replace_one( + {"_id": "juju"}, {"api_endpoints": endpoints} + ) + + async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]: + """ + Get list if VCA endpoints + + :param: vca_id: VCA ID + + :returns: List of endpoints + """ + endpoints = [] + if vca_id: + endpoints = (await self.get_vca_connection_data(vca_id)).endpoints + else: + juju_info = await self._get_juju_info() + if juju_info and "api_endpoints" in juju_info: + endpoints = juju_info["api_endpoints"] + return endpoints + + async def get_vca_id(self, vim_id: str = None) -> str: + """ + Get VCA ID from the database for a given VIM account ID + + :param: vim_id: VIM account ID + """ + vca_id = None + if vim_id: + vim_account = await self._vim_accounts_collection.find_one({"_id": vim_id}) + if vim_account and "vca" in vim_account: + vca_id = vim_account["vca"] + return vca_id + + async def _get_juju_info(self): + """Get Juju information (the default VCA) from the admin collection""" + return await self._admin_collection.find_one({"_id": "juju"}) + + # DECRYPT METHODS + async def decrypt_fields( + self, + item: dict, + fields: typing.List[str], + schema_version: str = None, + salt: str = None, + ): + """ + Decrypt fields + + Decrypt fields from a dictionary. Follows the same logic as in osm_common. + + :param: item: Dictionary with the keys to be decrypted + :param: fields: List of keys to decrypt + :param: schema version: Schema version. (i.e. 1.11) + :param: salt: Salt for the decryption + """ + flags = re.I + + async def process(_item): + if isinstance(_item, list): + for elem in _item: + await process(elem) + elif isinstance(_item, dict): + for key, val in _item.items(): + if isinstance(val, str): + if any(re.search(f, key, flags) for f in fields): + _item[key] = await self.decrypt(val, schema_version, salt) + else: + await process(val) + + await process(item) + + async def decrypt(self, value, schema_version=None, salt=None): + """ + Decrypt an encrypted value + :param value: value to be decrypted. It is a base64 string + :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done. + If '1.1' symmetric AES encryption has been done + :param salt: optional salt to be used + :return: Plain content of value + """ + if not await self.secret_key or not schema_version or schema_version == "1.0": + return value + else: + secret_key = self._join_secret_key(salt) + encrypted_msg = b64decode(value) + cipher = AES.new(secret_key) + decrypted_msg = cipher.decrypt(encrypted_msg) + try: + unpadded_private_msg = decrypted_msg.decode().rstrip("\0") + except UnicodeDecodeError: + raise DbException( + "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?", + http_code=500, + ) + return unpadded_private_msg + + def _join_secret_key(self, update_key: typing.Any): + """ + Join secret key + + :param: update_key: str or bytes with the to update + """ + if isinstance(update_key, str): + update_key_bytes = update_key.encode() + else: + update_key_bytes = update_key + new_secret_key = ( + bytearray(self._secret_key) if self._secret_key else bytearray(32) + ) + for i, b in enumerate(update_key_bytes): + new_secret_key[i % 32] ^= b + return bytes(new_secret_key) + + @property + async def secret_key(self): + if self._secret_key: + return self._secret_key + else: + if self.database_key: + self._secret_key = self._join_secret_key(self.database_key) + version_data = await self._admin_collection.find_one({"_id": "version"}) + if version_data and version_data.get("serial"): + self._secret_key = self._join_secret_key( + b64decode(version_data["serial"]) + ) + return self._secret_key + + @property + def database_key(self): + return self._config["database_commonkey"] diff --git a/n2vc/tests/unit/test_config.py b/n2vc/tests/unit/test_config.py new file mode 100644 index 0000000..9a4af07 --- /dev/null +++ b/n2vc/tests/unit/test_config.py @@ -0,0 +1,58 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from unittest.mock import patch + + +from n2vc.config import EnvironConfig, ModelConfig, MODEL_CONFIG_KEYS + + +def generate_os_environ_dict(config, prefix): + return {f"{prefix}{k.upper()}": v for k, v in config.items()} + + +class TestEnvironConfig(TestCase): + def setUp(self): + self.config = {"host": "1.2.3.4", "port": "17070", "k8s_cloud": "k8s"} + + @patch("os.environ.items") + def test_environ_config_lcm(self, mock_environ_items): + envs = generate_os_environ_dict(self.config, "OSMLCM_VCA_") + envs["not_valid_env"] = "something" + mock_environ_items.return_value = envs.items() + config = EnvironConfig() + self.assertEqual(config, self.config) + + @patch("os.environ.items") + def test_environ_config_mon(self, mock_environ_items): + envs = generate_os_environ_dict(self.config, "OSMMON_VCA_") + envs["not_valid_env"] = "something" + mock_environ_items.return_value = envs.items() + config = EnvironConfig() + self.assertEqual(config, self.config) + + +class TestModelConfig(TestCase): + def setUp(self): + self.config = { + f'model_config_{model_key.replace("-", "_")}': "somevalue" + for model_key in MODEL_CONFIG_KEYS + } + self.config["model_config_invalid"] = "something" + self.model_config = {model_key: "somevalue" for model_key in MODEL_CONFIG_KEYS} + + def test_model_config(self): + model_config = ModelConfig(self.config) + self.assertEqual(model_config, self.model_config) diff --git a/n2vc/tests/unit/test_connection.py b/n2vc/tests/unit/test_connection.py new file mode 100644 index 0000000..c7f0bb4 --- /dev/null +++ b/n2vc/tests/unit/test_connection.py @@ -0,0 +1,68 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from unittest import TestCase +from unittest.mock import Mock, patch + + +from n2vc.tests.unit.utils import AsyncMock +from n2vc.vca import connection + + +class TestConnection(TestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + self.store = AsyncMock() + + def test_load_from_store(self): + self.loop.run_until_complete(connection.get_connection(self.store, "vim_id")) + + self.store.get_vca_connection_data.assert_called_once() + + def test_cloud_properties(self): + conn = self.loop.run_until_complete( + connection.get_connection(self.store, "vim_id") + ) + conn._data = Mock() + conn._data.lxd_cloud = "name" + conn._data.k8s_cloud = "name" + conn._data.lxd_credentials = "credential" + conn._data.k8s_credentials = "credential" + + self.assertEqual(conn.lxd_cloud.name, "name") + self.assertEqual(conn.lxd_cloud.credential_name, "credential") + self.assertEqual(conn.k8s_cloud.name, "name") + self.assertEqual(conn.k8s_cloud.credential_name, "credential") + + @patch("n2vc.vca.connection.EnvironConfig") + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_load_from_env(self, mock_base64_to_cacert, mock_env): + mock_base64_to_cacert.return_value = "cacert" + mock_env.return_value = { + "endpoints": "1.2.3.4:17070", + "user": "user", + "secret": "secret", + "cacert": "cacert", + "pubkey": "pubkey", + "cloud": "cloud", + "credentials": "credentials", + "k8s_cloud": "k8s_cloud", + "k8s_credentials": "k8s_credentials", + "model_config": {}, + "api-proxy": "api_proxy", + } + self.store.get_vca_endpoints.return_value = ["1.2.3.5:17070"] + self.loop.run_until_complete(connection.get_connection(self.store)) + self.store.get_vca_connection_data.assert_not_called() diff --git a/n2vc/tests/unit/test_juju_watcher.py b/n2vc/tests/unit/test_juju_watcher.py index d333b33..5f81274 100644 --- a/n2vc/tests/unit/test_juju_watcher.py +++ b/n2vc/tests/unit/test_juju_watcher.py @@ -45,6 +45,7 @@ class JujuWatcherTest(asynctest.TestCase): def test_model_watcher(self, allwatcher): tests = Deltas allwatcher.return_value = FakeWatcher() + n2vc = AsyncMock() for test in tests: with self.assertRaises(asyncio.TimeoutError): allwatcher.return_value.delta_to_return = [test.delta] @@ -55,12 +56,12 @@ class JujuWatcherTest(asynctest.TestCase): test.filter.entity_type, timeout=0, db_dict={"something"}, - n2vc=self.n2vc, + n2vc=n2vc, + vca_id=None, ) ) - self.assertEqual(self.n2vc.last_written_values, test.db.data) - self.n2vc.last_written_values = None + n2vc.write_app_status_to_db.assert_called() @mock.patch("n2vc.juju_watcher.asyncio.wait") def test_wait_for(self, wait): diff --git a/n2vc/tests/unit/test_k8s_juju_conn.py b/n2vc/tests/unit/test_k8s_juju_conn.py index f454380..208c849 100644 --- a/n2vc/tests/unit/test_k8s_juju_conn.py +++ b/n2vc/tests/unit/test_k8s_juju_conn.py @@ -23,172 +23,58 @@ from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock from n2vc.exceptions import ( MethodNotImplemented, K8sException, - N2VCBadArgumentsException, ) +from n2vc.vca.connection_data import ConnectionData class K8sJujuConnTestCase(asynctest.TestCase): - @asynctest.mock.patch("juju.controller.Controller.update_endpoints") - @asynctest.mock.patch("juju.client.connector.Connector.connect") - @asynctest.mock.patch("juju.controller.Controller.connection") - @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert") @asynctest.mock.patch("n2vc.k8s_juju_conn.Libjuju") + @asynctest.mock.patch("n2vc.k8s_juju_conn.MotorStore") + @asynctest.mock.patch("n2vc.k8s_juju_conn.get_connection") + @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert") def setUp( self, - mock_libjuju=None, mock_base64_to_cacert=None, - mock_connection=None, - mock_connect=None, - mock_update_endpoints=None, + mock_get_connection=None, + mock_store=None, + mock_libjuju=None, ): self.loop = asyncio.get_event_loop() - mock_libjuju.return_value = AsyncMock() - db = Mock() - vca_config = { - "secret": "secret", - "api_proxy": "api_proxy", - "cloud": "cloud", - "k8s_cloud": "k8s_cloud", - "user": "user", - "host": "1.1.1.1", - "port": 17070, - "ca_cert": "cacert", - } - + self.db = Mock() + mock_base64_to_cacert.return_value = """ + -----BEGIN CERTIFICATE----- + SOMECERT + -----END CERTIFICATE-----""" + mock_libjuju.return_value = Mock() + mock_store.return_value = AsyncMock() + mock_vca_connection = Mock() + mock_get_connection.return_value = mock_vca_connection + mock_vca_connection.data.return_value = ConnectionData( + **{ + "endpoints": ["1.2.3.4:17070"], + "user": "user", + "secret": "secret", + "cacert": "cacert", + "pubkey": "pubkey", + "lxd-cloud": "cloud", + "lxd-credentials": "credentials", + "k8s-cloud": "k8s_cloud", + "k8s-credentials": "k8s_credentials", + "model-config": {}, + "api-proxy": "api_proxy", + } + ) logging.disable(logging.CRITICAL) self.k8s_juju_conn = K8sJujuConnector( fs=fslocal.FsLocal(), - db=db, + db=self.db, log=None, loop=self.loop, - vca_config=vca_config, - on_update_db=None, - ) - - -class K8sJujuConnInitSuccessTestCase(asynctest.TestCase): - def setUp( - self, - ): - logging.disable(logging.CRITICAL) - - @asynctest.mock.patch("juju.controller.Controller.update_endpoints") - @asynctest.mock.patch("juju.client.connector.Connector.connect") - @asynctest.mock.patch("juju.controller.Controller.connection") - @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert") - @asynctest.mock.patch("n2vc.libjuju.Libjuju.__init__") - def test_success( - self, - mock_libjuju=None, - mock_base64_to_cacert=None, - mock_connection=None, - mock_connect=None, - mock_update_endpoints=None, - ): - mock_libjuju.return_value = None - loop = asyncio.get_event_loop() - log = logging.getLogger() - db = Mock() - vca_config = { - "secret": "secret", - "cloud": "cloud", - "k8s_cloud": "k8s_cloud", - "user": "user", - "host": "1.1.1.1", - "port": 17070, - "ca_cert": "cacert", - } - K8sJujuConnector( - fs=fslocal.FsLocal(), - db=db, - log=log, - loop=self.loop, - vca_config=vca_config, on_update_db=None, ) - - mock_libjuju.assert_called_once_with( - endpoint="1.1.1.1:17070", - api_proxy=None, # Not needed for k8s charms - model_config={}, - username="user", - password="secret", - cacert=mock_base64_to_cacert.return_value, - loop=loop, - log=log, - db=db, - ) - - -class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase): - def setUp( - self, - ): - self.loop = asyncio.get_event_loop() - logging.disable(logging.CRITICAL) - self.vca_config = { - "secret": "secret", - "api_proxy": "api_proxy", - "cloud": "cloud", - "k8s_cloud": "k8s_cloud", - "user": "user", - "host": "1.1.1.1", - "port": 17070, - "ca_cert": "cacert", - } - - def test_missing_vca_config_host(self): - db = Mock() - self.vca_config.pop("host") - with self.assertRaises(N2VCBadArgumentsException): - self.k8s_juju_conn = K8sJujuConnector( - fs=fslocal.FsLocal(), - db=db, - log=None, - loop=self.loop, - vca_config=self.vca_config, - on_update_db=None, - ) - - def test_missing_vca_config_user(self): - db = Mock() - self.vca_config.pop("user") - with self.assertRaises(N2VCBadArgumentsException): - self.k8s_juju_conn = K8sJujuConnector( - fs=fslocal.FsLocal(), - db=db, - log=None, - loop=self.loop, - vca_config=self.vca_config, - on_update_db=None, - ) - - def test_missing_vca_config_secret(self): - db = Mock() - self.vca_config.pop("secret") - with self.assertRaises(N2VCBadArgumentsException): - self.k8s_juju_conn = K8sJujuConnector( - fs=fslocal.FsLocal(), - db=db, - log=None, - loop=self.loop, - vca_config=self.vca_config, - on_update_db=None, - ) - - def test_missing_vca_config_ca_cert(self): - db = Mock() - self.vca_config.pop("ca_cert") - with self.assertRaises(N2VCBadArgumentsException): - self.k8s_juju_conn = K8sJujuConnector( - fs=fslocal.FsLocal(), - db=db, - log=None, - loop=self.loop, - vca_config=self.vca_config, - on_update_db=None, - ) + self.k8s_juju_conn._store.get_vca_id.return_value = None + self.k8s_juju_conn.libjuju = Mock() @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class") @@ -344,11 +230,7 @@ class InstallTest(K8sJujuConnTestCase): ) ) self.assertEqual(mock_chdir.call_count, 2) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( "local:{}".format(self.local_bundle), model_name=self.kdu_instance, @@ -368,11 +250,7 @@ class InstallTest(K8sJujuConnTestCase): timeout=1800, ) ) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( self.cs_bundle, model_name=self.kdu_instance, @@ -392,11 +270,7 @@ class InstallTest(K8sJujuConnTestCase): timeout=1800, ) ) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( self.http_bundle, model_name=self.kdu_instance, @@ -415,11 +289,7 @@ class InstallTest(K8sJujuConnTestCase): timeout=1800, ) ) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( self.cs_bundle, model_name=self.kdu_instance, @@ -458,11 +328,7 @@ class InstallTest(K8sJujuConnTestCase): timeout=1800, ) ) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( self.cs_bundle, model_name=self.kdu_instance, @@ -500,11 +366,7 @@ class InstallTest(K8sJujuConnTestCase): timeout=1800, ) ) - self.k8s_juju_conn.libjuju.add_model.assert_called_once_with( - model_name=self.kdu_instance, - cloud_name=self.cluster_uuid, - credential_name="cred-{}".format(self.cluster_uuid), - ) + self.k8s_juju_conn.libjuju.add_model.assert_called_once() self.k8s_juju_conn.libjuju.deploy.assert_called_once_with( "local:{}".format(self.local_bundle), model_name=self.kdu_instance, diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py index 29bcb7b..fde6817 100644 --- a/n2vc/tests/unit/test_libjuju.py +++ b/n2vc/tests/unit/test_libjuju.py @@ -15,12 +15,12 @@ import asyncio import asynctest import tempfile -from unittest import mock +from unittest.mock import Mock, patch import juju import kubernetes from juju.errors import JujuAPIError import logging -from .utils import FakeN2VC, FakeMachine, FakeApplication +from .utils import FakeMachine, FakeApplication from n2vc.libjuju import Libjuju from n2vc.exceptions import ( JujuControllerFailedConnecting, @@ -33,122 +33,46 @@ from n2vc.exceptions import ( JujuError, ) from n2vc.k8s_juju_conn import generate_rbac_id +from n2vc.tests.unit.utils import AsyncMock +from n2vc.vca.connection import Connection +from n2vc.vca.connection_data import ConnectionData +cacert = """-----BEGIN CERTIFICATE----- +SOMECERT +-----END CERTIFICATE-----""" + + +@asynctest.mock.patch("n2vc.libjuju.Controller") class LibjujuTestCase(asynctest.TestCase): - @asynctest.mock.patch("juju.controller.Controller.update_endpoints") - @asynctest.mock.patch("juju.client.connector.Connector.connect") - @asynctest.mock.patch("juju.controller.Controller.connection") - @asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db") + @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert") def setUp( self, - mock__get_api_endpoints_db=None, - mock_connection=None, - mock_connect=None, - mock_update_endpoints=None, - ): - loop = asyncio.get_event_loop() - n2vc = FakeN2VC() - mock__get_api_endpoints_db.return_value = ["127.0.0.1:17070"] - endpoints = "127.0.0.1:17070" - username = "admin" - password = "secret" - cacert = """ - -----BEGIN CERTIFICATE----- - SOMECERT - -----END CERTIFICATE-----""" - self.libjuju = Libjuju( - endpoints, - "192.168.0.155:17070", - username, - password, - cacert, - loop, - log=None, - db={"get_one": []}, - n2vc=n2vc, - ) - logging.disable(logging.CRITICAL) - loop.run_until_complete(self.libjuju.disconnect()) - - -@asynctest.mock.patch("n2vc.libjuju.Libjuju._create_health_check_task") -@asynctest.mock.patch("n2vc.libjuju.Libjuju._update_api_endpoints_db") -@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db") -class LibjujuInitTestCase(asynctest.TestCase): - def setUp(self): + mock_base64_to_cacert=None, + ): self.loop = asyncio.get_event_loop() - self.n2vc = FakeN2VC() - self.endpoint = "192.168.100.100:17070" - self.username = "admin" - self.password = "secret" - self.cacert = """ - -----BEGIN CERTIFICATE----- - SOMECERT - -----END CERTIFICATE-----""" - - def test_endpoint_not_in_db( - self, - mock__get_api_endpoints_db, - mock_update_endpoints, - mock_create_health_check_task, - ): - mock__get_api_endpoints_db.return_value = ["another_ip"] - Libjuju( - self.endpoint, - "192.168.0.155:17070", - self.username, - self.password, - self.cacert, - self.loop, - log=None, - db={"get_one": []}, - n2vc=self.n2vc, + self.db = Mock() + mock_base64_to_cacert.return_value = cacert + Connection._load_vca_connection_data = Mock() + vca_connection = Connection(AsyncMock()) + vca_connection._data = ConnectionData( + **{ + "endpoints": ["1.2.3.4:17070"], + "user": "user", + "secret": "secret", + "cacert": "cacert", + "pubkey": "pubkey", + "lxd-cloud": "cloud", + "lxd-credentials": "credentials", + "k8s-cloud": "k8s_cloud", + "k8s-credentials": "k8s_credentials", + "model-config": {}, + "api-proxy": "api_proxy", + } ) - mock_update_endpoints.assert_called_once_with([self.endpoint]) - mock__get_api_endpoints_db.assert_called_once() - - def test_endpoint_in_db( - self, - mock__get_api_endpoints_db, - mock_update_endpoints, - mock_create_health_check_task, - ): - mock__get_api_endpoints_db.return_value = [self.endpoint, "another_ip"] - Libjuju( - self.endpoint, - "192.168.0.155:17070", - self.username, - self.password, - self.cacert, - self.loop, - log=None, - db={"get_one": []}, - n2vc=self.n2vc, - ) - mock_update_endpoints.assert_not_called() - mock__get_api_endpoints_db.assert_called_once() - - def test_no_db_endpoints( - self, - mock__get_api_endpoints_db, - mock_update_endpoints, - mock_create_health_check_task, - ): - mock__get_api_endpoints_db.return_value = None - Libjuju( - self.endpoint, - "192.168.0.155:17070", - self.username, - self.password, - self.cacert, - self.loop, - log=None, - db={"get_one": []}, - n2vc=self.n2vc, - ) - mock_update_endpoints.assert_called_once_with([self.endpoint]) - mock__get_api_endpoints_db.assert_called_once() + logging.disable(logging.CRITICAL) + self.libjuju = Libjuju(vca_connection, self.loop) + self.loop.run_until_complete(self.libjuju.disconnect()) @asynctest.mock.patch("juju.controller.Controller.connect") @@ -156,41 +80,34 @@ class LibjujuInitTestCase(asynctest.TestCase): "juju.controller.Controller.api_endpoints", new_callable=asynctest.CoroutineMock(return_value=["127.0.0.1:17070"]), ) -@asynctest.mock.patch("n2vc.libjuju.Libjuju._update_api_endpoints_db") class GetControllerTest(LibjujuTestCase): def setUp(self): super(GetControllerTest, self).setUp() - def test_diff_endpoint( - self, mock__update_api_endpoints_db, mock_api_endpoints, mock_connect - ): + def test_diff_endpoint(self, mock_api_endpoints, mock_connect): self.libjuju.endpoints = [] controller = self.loop.run_until_complete(self.libjuju.get_controller()) - mock__update_api_endpoints_db.assert_called_once_with(["127.0.0.1:17070"]) self.assertIsInstance(controller, juju.controller.Controller) @asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller") def test_exception( self, mock_disconnect_controller, - mock__update_api_endpoints_db, mock_api_endpoints, mock_connect, ): self.libjuju.endpoints = [] - mock__update_api_endpoints_db.side_effect = Exception() + + mock_connect.side_effect = Exception() controller = None with self.assertRaises(JujuControllerFailedConnecting): controller = self.loop.run_until_complete(self.libjuju.get_controller()) self.assertIsNone(controller) mock_disconnect_controller.assert_called_once() - def test_same_endpoint_get_controller( - self, mock__update_api_endpoints_db, mock_api_endpoints, mock_connect - ): + def test_same_endpoint_get_controller(self, mock_api_endpoints, mock_connect): self.libjuju.endpoints = ["127.0.0.1:17070"] controller = self.loop.run_until_complete(self.libjuju.get_controller()) - mock__update_api_endpoints_db.assert_not_called() self.assertIsInstance(controller, juju.controller.Controller) @@ -231,9 +148,7 @@ class AddModelTest(LibjujuTestCase): mock_model_exists.return_value = True # This should not raise an exception - self.loop.run_until_complete( - self.libjuju.add_model("existing_model", "cloud") - ) + self.loop.run_until_complete(self.libjuju.add_model("existing_model", "cloud")) mock_disconnect_controller.assert_called() @@ -251,7 +166,7 @@ class AddModelTest(LibjujuTestCase): mock_get_controller.return_value = juju.controller.Controller() self.loop.run_until_complete( - self.libjuju.add_model("nonexisting_model", "cloud") + self.libjuju.add_model("nonexisting_model", Mock()) ) mock_add_model.assert_called_once() @@ -1863,7 +1778,7 @@ class GetK8sCloudCredentials(LibjujuTestCase): mock_configuration.key_file = None self.token = None self.cert_data = None - with mock.patch.object(self.libjuju.log, "debug") as mock_debug: + with patch.object(self.libjuju.log, "debug") as mock_debug: credential = self.libjuju.get_k8s_cloud_credential( mock_configuration, self.cert_data, diff --git a/n2vc/tests/unit/test_n2vc_juju_conn.py b/n2vc/tests/unit/test_n2vc_juju_conn.py index e5e26be..d89de3f 100644 --- a/n2vc/tests/unit/test_n2vc_juju_conn.py +++ b/n2vc/tests/unit/test_n2vc_juju_conn.py @@ -22,143 +22,145 @@ import asynctest from n2vc.n2vc_juju_conn import N2VCJujuConnector from osm_common import fslocal from n2vc.exceptions import ( - JujuK8sProxycharmNotSupported, N2VCBadArgumentsException, N2VCException, ) +from n2vc.tests.unit.utils import AsyncMock +from n2vc.vca.connection_data import ConnectionData class N2VCJujuConnTestCase(asynctest.TestCase): - @asynctest.mock.patch("n2vc.libjuju.Libjuju._create_health_check_task") - @asynctest.mock.patch("juju.controller.Controller.update_endpoints") - @asynctest.mock.patch("juju.client.connector.Connector.connect") - @asynctest.mock.patch("juju.controller.Controller.connection") - @asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db") + @asynctest.mock.patch("n2vc.n2vc_juju_conn.MotorStore") + @asynctest.mock.patch("n2vc.n2vc_juju_conn.get_connection") + @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert") def setUp( self, - mock__get_api_endpoints_db=None, - mock_connection=None, - mock_connect=None, - mock_update_endpoints=None, - mock__create_health_check_task=None, + mock_base64_to_cacert=None, + mock_get_connection=None, + mock_store=None, ): - mock__get_api_endpoints_db.return_value = ["2.2.2.2:17070"] - loop = asyncio.get_event_loop() - db = {} - vca_config = { - "secret": "secret", - "api_proxy": "api_proxy", - "cloud": "cloud", - "k8s_cloud": "k8s_cloud", - } - + self.loop = asyncio.get_event_loop() + self.db = Mock() + mock_base64_to_cacert.return_value = """ + -----BEGIN CERTIFICATE----- + SOMECERT + -----END CERTIFICATE-----""" + mock_store.return_value = AsyncMock() + mock_vca_connection = Mock() + mock_get_connection.return_value = mock_vca_connection + mock_vca_connection.data.return_value = ConnectionData( + **{ + "endpoints": ["1.2.3.4:17070"], + "user": "user", + "secret": "secret", + "cacert": "cacert", + "pubkey": "pubkey", + "lxd-cloud": "cloud", + "lxd-credentials": "credentials", + "k8s-cloud": "k8s_cloud", + "k8s-credentials": "k8s_credentials", + "model-config": {}, + "api-proxy": "api_proxy", + } + ) logging.disable(logging.CRITICAL) N2VCJujuConnector.get_public_key = Mock() self.n2vc = N2VCJujuConnector( - db=db, + db=self.db, fs=fslocal.FsLocal(), log=None, - loop=loop, - url="2.2.2.2:17070", - username="admin", - vca_config=vca_config, + loop=self.loop, on_update_db=None, ) N2VCJujuConnector.get_public_key.assert_not_called() + self.n2vc.libjuju = Mock() -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_metrics") class GetMetricssTest(N2VCJujuConnTestCase): def setUp(self): super(GetMetricssTest, self).setUp() + self.n2vc.libjuju.get_metrics = AsyncMock() - def test_success(self, mock_get_metrics): + def test_success(self): _ = self.loop.run_until_complete(self.n2vc.get_metrics("model", "application")) - mock_get_metrics.assert_called_once() + self.n2vc.libjuju.get_metrics.assert_called_once() - def test_except(self, mock_get_metrics): - mock_get_metrics.side_effect = Exception() + def test_except(self): + self.n2vc.libjuju.get_metrics.side_effect = Exception() with self.assertRaises(Exception): _ = self.loop.run_until_complete( self.n2vc.get_metrics("model", "application") ) - mock_get_metrics.assert_called_once() + self.n2vc.libjuju.get_metrics.assert_called_once() -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller") -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model") -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_executed_actions") -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_actions") -@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_application_configs") -@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_application") class UpdateVcaStatusTest(N2VCJujuConnTestCase): def setUp(self): super(UpdateVcaStatusTest, self).setUp() + self.n2vc.libjuju.get_controller = AsyncMock() + self.n2vc.libjuju.get_model = AsyncMock() + self.n2vc.libjuju.get_executed_actions = AsyncMock() + self.n2vc.libjuju.get_actions = AsyncMock() + self.n2vc.libjuju.get_application_configs = AsyncMock() + self.n2vc.libjuju._get_application = AsyncMock() def test_success( self, - mock_get_application, - mock_get_application_configs, - mock_get_actions, - mock_get_executed_actions, - mock_get_model, - mock_get_controller, ): - self.loop.run_until_complete(self.n2vc.update_vca_status( - {"model": {"applications": {"app": {"actions": {}}}}})) - mock_get_executed_actions.assert_called_once() - mock_get_actions.assert_called_once() - mock_get_application_configs.assert_called_once() + self.loop.run_until_complete( + self.n2vc.update_vca_status( + {"model": {"applications": {"app": {"actions": {}}}}} + ) + ) + self.n2vc.libjuju.get_executed_actions.assert_called_once() + self.n2vc.libjuju.get_actions.assert_called_once() + self.n2vc.libjuju.get_application_configs.assert_called_once() - def test_exception( - self, - mock_get_application, - mock_get_application_configs, - mock_get_actions, - mock_get_executed_actions, - mock_get_model, - mock_get_controller, - ): - mock_get_model.return_value = None - mock_get_executed_actions.side_effect = Exception() + def test_exception(self): + self.n2vc.libjuju.get_model.return_value = None + self.n2vc.libjuju.get_executed_actions.side_effect = Exception() with self.assertRaises(Exception): - self.loop.run_until_complete(self.n2vc.update_vca_status( - {"model": {"applications": {"app": {"actions": {}}}}})) - mock_get_executed_actions.assert_not_called() - mock_get_actions.assert_not_called_once() - mock_get_application_configs.assert_not_called_once() + self.loop.run_until_complete( + self.n2vc.update_vca_status( + {"model": {"applications": {"app": {"actions": {}}}}} + ) + ) + self.n2vc.libjuju.get_executed_actions.assert_not_called() + self.n2vc.libjuju.get_actions.assert_not_called_once() + self.n2vc.libjuju.get_application_configs.assert_not_called_once() -@asynctest.mock.patch("n2vc.libjuju.Libjuju.model_exists") @asynctest.mock.patch("osm_common.fslocal.FsLocal.file_exists") @asynctest.mock.patch( "osm_common.fslocal.FsLocal.path", new_callable=asynctest.PropertyMock, create=True ) -@asynctest.mock.patch("n2vc.libjuju.Libjuju.deploy_charm") -@asynctest.mock.patch("n2vc.libjuju.Libjuju.add_model") class K8sProxyCharmsTest(N2VCJujuConnTestCase): def setUp(self): super(K8sProxyCharmsTest, self).setUp() + self.n2vc.libjuju.model_exists = AsyncMock() + self.n2vc.libjuju.add_model = AsyncMock() + self.n2vc.libjuju.deploy_charm = AsyncMock() + self.n2vc.libjuju.model_exists.return_value = False def test_success( - self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists + self, + mock_path, + mock_file_exists, ): - mock_model_exists.return_value = None mock_file_exists.return_value = True mock_path.return_value = "/path" ee_id = self.loop.run_until_complete( self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "////path/", {}, + "charm", + "nsi-id.ns-id.vnf-id.vdu", + "////path/", + {}, ) ) - mock_add_model.assert_called_once_with( - "ns-id-k8s", - cloud_name=self.n2vc.k8s_cloud, - credential_name=self.n2vc.k8s_cloud - ) - mock_deploy_charm.assert_called_once_with( + self.n2vc.libjuju.add_model.assert_called_once() + self.n2vc.libjuju.deploy_charm.assert_called_once_with( model_name="ns-id-k8s", application_name="app-vnf-vnf-id-vdu-vdu", path="/path/path/", @@ -170,74 +172,70 @@ class K8sProxyCharmsTest(N2VCJujuConnTestCase): ) self.assertEqual(ee_id, "ns-id-k8s.app-vnf-vnf-id-vdu-vdu.k8s") - @asynctest.mock.patch( - "n2vc.n2vc_juju_conn.N2VCJujuConnector.k8s_cloud", - new_callable=asynctest.PropertyMock, - create=True, - ) - def test_no_k8s_cloud( + def test_no_artifact_path( self, - mock_k8s_cloud, - mock_add_model, - mock_deploy_charm, mock_path, mock_file_exists, - mock_model_exists, - ): - mock_k8s_cloud.return_value = None - with self.assertRaises(JujuK8sProxycharmNotSupported): - ee_id = self.loop.run_until_complete( - self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", {}, - ) - ) - self.assertIsNone(ee_id) - - def test_no_artifact_path( - self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists, ): with self.assertRaises(N2VCBadArgumentsException): ee_id = self.loop.run_until_complete( self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "", {}, + "charm", + "nsi-id.ns-id.vnf-id.vdu", + "", + {}, ) ) self.assertIsNone(ee_id) def test_no_db( - self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists, + self, + mock_path, + mock_file_exists, ): with self.assertRaises(N2VCBadArgumentsException): ee_id = self.loop.run_until_complete( self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", None, + "charm", + "nsi-id.ns-id.vnf-id.vdu", + "/path/", + None, ) ) self.assertIsNone(ee_id) def test_file_not_exists( - self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists, + self, + mock_path, + mock_file_exists, ): mock_file_exists.return_value = False with self.assertRaises(N2VCBadArgumentsException): ee_id = self.loop.run_until_complete( self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", {}, + "charm", + "nsi-id.ns-id.vnf-id.vdu", + "/path/", + {}, ) ) self.assertIsNone(ee_id) def test_exception( - self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists, + self, + mock_path, + mock_file_exists, ): - mock_model_exists.return_value = None mock_file_exists.return_value = True mock_path.return_value = "/path" - mock_deploy_charm.side_effect = Exception() + self.n2vc.libjuju.deploy_charm.side_effect = Exception() with self.assertRaises(N2VCException): ee_id = self.loop.run_until_complete( self.n2vc.install_k8s_proxy_charm( - "charm", "nsi-id.ns-id.vnf-id.vdu", "path/", {}, + "charm", + "nsi-id.ns-id.vnf-id.vdu", + "path/", + {}, ) ) self.assertIsNone(ee_id) diff --git a/n2vc/tests/unit/test_store.py b/n2vc/tests/unit/test_store.py new file mode 100644 index 0000000..c7aa2d6 --- /dev/null +++ b/n2vc/tests/unit/test_store.py @@ -0,0 +1,288 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from base64 import b64decode +from unittest import TestCase +from unittest.mock import Mock, patch + + +from n2vc.store import DbMongoStore, MotorStore +from n2vc.vca.connection_data import ConnectionData +from n2vc.tests.unit.utils import AsyncMock +from osm_common.dbmongo import DbException + + +class TestDbMongoStore(TestCase): + def setUp(self): + self.store = DbMongoStore(Mock()) + self.loop = asyncio.get_event_loop() + + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_get_vca_connection_data(self, mock_base64_to_cacert): + mock_base64_to_cacert.return_value = "cacert" + conn_data = { + "endpoints": ["1.2.3.4:17070"], + "user": "admin", + "secret": "1234", + "cacert": "cacert", + "pubkey": "pubkey", + "lxd-cloud": "lxd-cloud", + "lxd-credentials": "lxd-credentials", + "k8s-cloud": "k8s-cloud", + "k8s-credentials": "k8s-credentials", + "model-config": {}, + "api-proxy": None, + } + db_get_one = conn_data.copy() + db_get_one.update({"schema_version": "1.1", "_id": "id"}) + self.store.db.get_one.return_value = db_get_one + connection_data = self.loop.run_until_complete( + self.store.get_vca_connection_data("vca_id") + ) + self.assertTrue( + all( + connection_data.__dict__[k.replace("-", "_")] == v + for k, v in conn_data.items() + ) + ) + + def test_update_vca_endpoints(self): + endpoints = ["1.2.3.4:17070"] + self.store.db.get_one.side_effect = [None, {"api_endpoints": []}] + self.store.db.create.side_effect = DbException("already exists") + self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints)) + self.assertEqual(self.store.db.get_one.call_count, 2) + Mock() + self.store.db.set_one.assert_called_once_with( + "vca", {"_id": "juju"}, {"api_endpoints": endpoints} + ) + + def test_update_vca_endpoints_exception(self): + endpoints = ["1.2.3.4:17070"] + self.store.db.get_one.side_effect = [None, None] + self.store.db.create.side_effect = DbException("already exists") + with self.assertRaises(DbException): + self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints)) + self.assertEqual(self.store.db.get_one.call_count, 2) + self.store.db.set_one.assert_not_called() + + def test_update_vca_endpoints_with_vca_id(self): + endpoints = ["1.2.3.4:17070"] + self.store.db.get_one.return_value = {} + self.loop.run_until_complete( + self.store.update_vca_endpoints(endpoints, "vca_id") + ) + self.store.db.get_one.assert_called_once_with("vca", q_filter={"_id": "vca_id"}) + self.store.db.replace.assert_called_once_with( + "vca", "vca_id", {"endpoints": endpoints} + ) + + def test_get_vca_endpoints(self): + endpoints = ["1.2.3.4:17070"] + db_data = {"api_endpoints": endpoints} + db_returns = [db_data, None] + expected_returns = [endpoints, []] + returns = [] + self.store._get_juju_info = Mock() + self.store._get_juju_info.side_effect = db_returns + for _ in range(len(db_returns)): + e = self.loop.run_until_complete(self.store.get_vca_endpoints()) + returns.append(e) + self.assertEqual(expected_returns, returns) + + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_get_vca_endpoints_with_vca_id(self, mock_base64_to_cacert): + expected_endpoints = ["1.2.3.4:17070"] + mock_base64_to_cacert.return_value = "cacert" + self.store.get_vca_connection_data = Mock() + self.store.get_vca_connection_data.return_value = ConnectionData( + **{ + "endpoints": expected_endpoints, + "user": "admin", + "secret": "1234", + "cacert": "cacert", + } + ) + endpoints = self.loop.run_until_complete(self.store.get_vca_endpoints("vca_id")) + self.store.get_vca_connection_data.assert_called_with("vca_id") + self.assertEqual(expected_endpoints, endpoints) + + def test_get_vca_id(self): + self.assertIsNone(self.loop.run_until_complete(self.store.get_vca_id())) + + def test_get_vca_id_with_vim_id(self): + self.store.db.get_one.return_value = {"vca": "vca_id"} + vca_id = self.loop.run_until_complete(self.store.get_vca_id("vim_id")) + self.store.db.get_one.assert_called_once_with( + "vim_accounts", q_filter={"_id": "vim_id"}, fail_on_empty=False + ) + self.assertEqual(vca_id, "vca_id") + + +class TestMotorStore(TestCase): + def setUp(self): + self.store = MotorStore("uri") + self.vca_collection = Mock() + self.vca_collection.find_one = AsyncMock() + self.vca_collection.insert_one = AsyncMock() + self.vca_collection.replace_one = AsyncMock() + self.admin_collection = Mock() + self.admin_collection.find_one = AsyncMock() + self.admin_collection.insert_one = AsyncMock() + self.admin_collection.replace_one = AsyncMock() + self.vim_accounts_collection = Mock() + self.vim_accounts_collection.find_one = AsyncMock() + self.store._client = { + "osm": { + "vca": self.vca_collection, + "admin": self.admin_collection, + "vim_accounts": self.vim_accounts_collection, + } + } + self.store._config = {"database_commonkey": "osm"} + # self.store.decrypt_fields = Mock() + self.loop = asyncio.get_event_loop() + + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_get_vca_connection_data(self, mock_base64_to_cacert): + mock_base64_to_cacert.return_value = "cacert" + conn_data = { + "endpoints": ["1.2.3.4:17070"], + "user": "admin", + "secret": "1234", + "cacert": "cacert", + "pubkey": "pubkey", + "lxd-cloud": "lxd-cloud", + "lxd-credentials": "lxd-credentials", + "k8s-cloud": "k8s-cloud", + "k8s-credentials": "k8s-credentials", + "model-config": {}, + "api-proxy": None, + } + db_find_one = conn_data.copy() + db_find_one.update({"schema_version": "1.1", "_id": "id"}) + self.vca_collection.find_one.return_value = db_find_one + self.store.decrypt_fields = AsyncMock() + connection_data = self.loop.run_until_complete( + self.store.get_vca_connection_data("vca_id") + ) + self.assertTrue( + all( + connection_data.__dict__[k.replace("-", "_")] == v + for k, v in conn_data.items() + ) + ) + + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_get_vca_connection_data_exception(self, mock_base64_to_cacert): + mock_base64_to_cacert.return_value = "cacert" + self.vca_collection.find_one.return_value = None + with self.assertRaises(Exception): + self.loop.run_until_complete(self.store.get_vca_connection_data("vca_id")) + + def test_update_vca_endpoints(self): + endpoints = ["1.2.3.4:17070"] + self.admin_collection.find_one.side_effect = [None, {"api_endpoints": []}] + self.admin_collection.insert_one.side_effect = DbException("already exists") + self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints)) + self.assertEqual(self.admin_collection.find_one.call_count, 2) + self.admin_collection.replace_one.assert_called_once_with( + {"_id": "juju"}, {"api_endpoints": ["1.2.3.4:17070"]} + ) + + def test_get_vca_connection_data_with_id(self): + secret = "e7b253af37785045d1ca08b8d929e556" + encrypted_secret = "kI46kRJh828ExSNpr16OG/q5a5/qTsE0bsHrv/W/2/g=" + cacert = "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ4ekNDQWx1Z0F3SUJBZ0lVRWlzTTBoQWxiYzQ0Z1ZhZWh6bS80ZUsyNnRZd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0lURU5NQXNHQTFVRUNoTUVTblZxZFRFUU1BNEdBMVVFQXhNSGFuVnFkUzFqWVRBZUZ3MHlNVEEwTWpNeApNRFV3TXpSYUZ3MHpNVEEwTWpNeE1EVTFNelJhTUNFeERUQUxCZ05WQkFvVEJFcDFhblV4RURBT0JnTlZCQU1UCkIycDFhblV0WTJFd2dnR2lNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJqd0F3Z2dHS0FvSUJnUUNhTmFvNGZab2gKTDJWYThtdy9LdCs3RG9tMHBYTlIvbEUxSHJyVmZvbmZqZFVQV01zSHpTSjJZZXlXcUNSd3BiaHlLaE82N1c1dgpUY2RsV3Y3WGFLTGtsdVkraDBZY3BQT3BFTmZZYmxrNGk0QkV1L0wzYVY5MFFkUFFrMG94S01CS2R5QlBNZVNNCkJmS2pPWXdyOGgzM0ZWUWhmVkJnMXVGZ2tGaDdTamNuNHczUFdvc1BCMjNiVHBCbGR3VE9zemN4Qm9TaDNSVTkKTzZjb3lQdDdEN0drOCtHRlA3RGRUQTdoV1RkaUM4cDBkeHp2RUNmY0psMXNFeFEyZVprS1QvVzZyelNtVDhUTApCM0ErM1FDRDhEOEVsQU1IVy9zS25SeHphYU8welpNVmVlQnRnNlFGZ1F3M0dJMGo2ZTY0K2w3VExoOW8wSkZVCjdpUitPY01xUzVDY0NROGpWV3JPSk9Xc2dEbDZ4T2FFREczYnR5SVJHY29jbVcvcEZFQjNZd1A2S1BRTUIrNXkKWDdnZExEWmFGRFBVakZmblhkMnhHdUZlMnpRTDNVbXZEUkZuUlBBaW02QlpQbWo1OFh2emFhZXROa3lyaUZLZwp4Z0Z1dVpTcDUwV2JWdjF0MkdzOTMrRE53NlhFZHRFYnlWWUNBa28xTTY0MkozczFnN3NoQnRFQ0F3RUFBYU1qCk1DRXdEZ1lEVlIwUEFRSC9CQVFEQWdLa01BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUwKQlFBRGdnR0JBRXYxM2o2ZGFVbDBqeERPSnNTV1ZJZS9JdXNXVTRpN2ZXSWlqMHAwRU1GNS9LTE8yemRndTR5SQoreVd2T3N5aVFPanEzMlRYVlo2bTRDSnBkR1dGVE5HK2lLdXVOU3M0N3g3Q3dmVUNBWm5VVzhyamd3ZWJyS3BmCkJMNEVQcTZTcW0rSmltN0VPankyMWJkY2cyUXdZb3A3eUhvaHcveWEvL0l6RTMzVzZxNHlJeEFvNDBVYUhPTEMKTGtGbnNVYitjcFZBeFlPZGp6bjFzNWhnclpuWXlETEl3WmtIdFdEWm94alUzeC9jdnZzZ1FzLytzTWYrRFU4RgpZMkJKRHJjQ1VQM2xzclc0QVpFMFplZkEwOTlncFEvb3dSN0REYnMwSjZUeFM4NGt6Tldjc1FuWnRraXZheHJNClkyVHNnaWVndFExVFdGRWpxLy9sUFV4emJCdmpnd1FBZm5CQXZGeVNKejdTa0VuVm5rUXJGaUlUQVArTHljQVIKMlg4UFI2ZGI1bEt0SitBSENDM3kvZmNQS2k0ZzNTL3djeXRRdmdvOXJ6ODRFalp5YUNTaGJXNG9jNzNrMS9RcAowQWtHRDU0ZGVDWWVPYVJNbW96c0w3ZzdxWkpFekhtODdOcVBYSy9EZFoweWNxaVFhMXY2T3QxNjdXNUlzMUkzCjBWb0IzUzloSlE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCgo=" # noqa: E501 + encrypted_cacert = "QeV4evTLXzcKwZZvmXQ/OvSHToXH3ISwfoLmU+Q9JlQWAFUHSJ9IhO0ewaQrJmx3NkfFb7NCxsQhh+wE57zDW4rWgn4w/SWkzvwSi1h2xYOO3ECEHzzVqgUm15Sk0xaj1Fv9Ed4hipf6PRijeOZ7A1G9zekr1w9WIvebMyJZrK+f6QJ8AP20NUZqG/3k+MeJr3kjrl+8uwU5aPOrHAexSQGAqSKTkWzW7glmlyMWTjwkuSgNVgFg0ctdWTZ5JnNwxXbpjwIKrC4E4sIHcxko2vsTeLF8pZFPk+3QUZIg8BrgtyM3lJC2kO1g3emPQhCIk3VDb5GBgssc/GyFyRXNS651d5BNgcABOKZ4Rv/gGnprB35zP7TKJKkST44XJTEBiugWMkSZg+T9H98/l3eE34O6thfTZXgIyG+ZM6uGlW2XOce0OoEIyJiEL039WJe3izjbD3b9sCCdgQc0MgS+hTaayJI6oCUWPsJLmRji19jLi/wjOsU5gPItCFWw3pBye/A4Zf8Hxm+hShvqBnk8R2yx1fPTiyw/Zx4Jn8m49XQJyjDSZnhIck0PVHR9xWzKCr++PKljLMLdkdFxVRVPFQk/FBbesqofjSXsq9DASY6ACTL3Jmignx2OXD6ac4SlBqCTjV2dIM0yEgZF7zwMNCtppRdXTV8S29JP4W2mfaiqXCUSRTggv8EYU+9diCE+8sPB6HjuLrsfiySbFlYR2m4ysDGXjsVx5CDAf0Nh4IRfcSceYnnBGIQ2sfgGcJFOZoJqr/QeE2NWz6jlWYbWT7MjS/0decpKxP7L88qrR+F48WXQvfsvjWgKjlMKw7lHmFF8FeY836VWWICTRZx+y6IlY1Ys2ML4kySF27Hal4OPhOOoBljMNMVwUEvBulOnKUWw4BGz8eGCl8Hw6tlyJdC7kcBj/aCyNCR/NnuDk4Wck6e//He8L6mS83OJi/hIFc8vYQxnCJMXj9Ou7wr5hxtBnvxXzZM3kFHxCDO24Cd5UyBV9GD8TiQJfBGAy7a2BCBMb5ESVX8NOkyyv2hXMHOjpnKhUM9yP3Ke4CBImO7mCKJNHdFVtAmuyVKJ+jT6ooAAArkX2xwEAvBEpvGNmW2jgs6wxSuKY0h5aUm0rA4v/s8fqSZhzdInB54sMldyAnt9G+9e+g933DfyA/tkc56Ed0vZ/XEvTkThVHyUbfYR/Gjsoab1RpnDBi4aZ2E7iceoBshy+L6NXdL0jlWEs4ZubiWlbVNWlN/MqJcjV/quLU7q4HtkG0MDEFm6To3o48x7xpv8otih6YBduNqBFnwQ6Qz9rM2chFgOR4IgNSZKPxHO0AGCi1gnK/CeCvrSfWYAMn+2rmw0hMZybqKMStG28+rXsKDdqmy6vAwL/+dJwkAW+ix68rWRXpeqHlWidu4SkIBELuwEkFIC/GJU/DRvcN2GG9uP1m+VFifCIS2UdiO4OVrP6PVoW1O+jBJvFH3K1YT7CRqevb9OzjS9fO1wjkOff0W8zZyJK9Mp25aynpf0k3oMpZDpjnlOsFXFUb3N6SvXD1Yi95szIlmsr5yRYaeGUJH7/SAmMr8R6RqsCR0ANptL2dtRoGPi/qcDQE15vnjJ+QMYCg9KbCdV+Qq5di93XAjmwPj6tKZv0aXQuaTZgYR7bdLmAnJaFLbHWcQG1k6F/vdKNEb7llLsoAD9KuKXPZT/LErIyKcI0RZySy9yvhTZb4jQWn17b83yfvqfd5/2NpcyaY4gNERhDRJHw7VhoS5Leai5ZnFaO3C1vU9tIJ85XgCUASTsBLoQWVCKPSQZGxzF7PVLnHui3YA5OsOQpVqAPtgGZ12tP9XkEKj+u2/Atj2bgYrqBF7zUL64X/AQpwr/UElWDhJLSD/KStVeDOUx3AwAVVi9eTUJr6NiNMutCE1sqUf9XVIddgZ/BaG5t3NV2L+T+11QzAl+Xrh8wH/XeUCTmnU3NGkvCz/9Y7PMS+qQL7T7WeGdYmEhb5s/5p/yjSYeqybr5sANOHs83OdeSXbop9cLWW+JksHmS//rHHcrrJhZgCb3P0EOpEoEMCarT6sJq0V1Hwf/YNFdJ9V7Ac654ALS+a9ffNthMUEJeY21QMtNOrEg3QH5RWBPn+yOYN/f38tzwlT1k6Ec94y/sBmeQVv8rRzkkiMSXeAL5ATdJntq8NQq5JbvLQDNnZnHQthZt+uhcUf08mWlRrxxBUaE6xLppgMqFdYSjLGvgn/d8FZ9y7UCg5ZBhgP1rrRQL1COpNKKlJLf5laqwiGAucIDmzSbhO+MidSauDLWuv+fsdd2QYk98PHxqNrPYLrlAlABFi3JEApBm4IlrGbHxKg6dRiy7L1c9xWnAD7E3XrZrSc6DXvGRsjMXWoQdlp4CX5H3cdH9sjIE6akWqiwwrOP6QTbJcxmJGv/MVhsDVrVKmrKSn2H0/Us1fyYCHCOyCSc2L96uId8i9wQO1NXj+1PJmUq3tJ8U0TUwTblOEQdYej99xEI8EzsXLjNJHCgbDygtHBYd/SHToXH3ISwfoLmU+Q9JlS1woaUpVa5sdvbsr4BXR6J" # noqa: E501 + + self.vca_collection.find_one.return_value = { + "_id": "2ade7f0e-9b58-4dbd-93a3-4ec076185d39", + "schema_version": "1.11", + "endpoints": [], + "user": "admin", + "secret": encrypted_secret, + "cacert": encrypted_cacert, + } + self.admin_collection.find_one.return_value = { + "serial": b"l+U3HDp9td+UjQ+AN+Ypj/Uh7n3C+rMJueQNNxkIpWI=" + } + connection_data = self.loop.run_until_complete( + self.store.get_vca_connection_data("vca_id") + ) + self.assertEqual(connection_data.endpoints, []) + self.assertEqual(connection_data.user, "admin") + self.assertEqual(connection_data.secret, secret) + self.assertEqual( + connection_data.cacert, b64decode(cacert.encode("utf-8")).decode("utf-8") + ) + + def test_update_vca_endpoints_exception(self): + endpoints = ["1.2.3.4:17070"] + self.admin_collection.find_one.side_effect = [None, None] + self.admin_collection.insert_one.side_effect = DbException("already exists") + with self.assertRaises(DbException): + self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints)) + self.assertEqual(self.admin_collection.find_one.call_count, 2) + self.admin_collection.replace_one.assert_not_called() + + def test_update_vca_endpoints_with_vca_id(self): + endpoints = ["1.2.3.4:17070"] + self.vca_collection.find_one.return_value = {} + self.loop.run_until_complete( + self.store.update_vca_endpoints(endpoints, "vca_id") + ) + self.vca_collection.find_one.assert_called_once_with({"_id": "vca_id"}) + self.vca_collection.replace_one.assert_called_once_with( + {"_id": "vca_id"}, {"endpoints": endpoints} + ) + + def test_get_vca_endpoints(self): + endpoints = ["1.2.3.4:17070"] + db_data = {"api_endpoints": endpoints} + db_returns = [db_data, None] + expected_returns = [endpoints, []] + returns = [] + self.admin_collection.find_one.side_effect = db_returns + for _ in range(len(db_returns)): + e = self.loop.run_until_complete(self.store.get_vca_endpoints()) + returns.append(e) + self.assertEqual(expected_returns, returns) + + @patch("n2vc.vca.connection_data.base64_to_cacert") + def test_get_vca_endpoints_with_vca_id(self, mock_base64_to_cacert): + expected_endpoints = ["1.2.3.4:17070"] + mock_base64_to_cacert.return_value = "cacert" + self.store.get_vca_connection_data = AsyncMock() + self.store.get_vca_connection_data.return_value = ConnectionData( + **{ + "endpoints": expected_endpoints, + "user": "admin", + "secret": "1234", + "cacert": "cacert", + } + ) + endpoints = self.loop.run_until_complete(self.store.get_vca_endpoints("vca_id")) + self.store.get_vca_connection_data.assert_called_with("vca_id") + self.assertEqual(expected_endpoints, endpoints) + + def test_get_vca_id(self): + self.assertIsNone(self.loop.run_until_complete((self.store.get_vca_id()))) + + def test_get_vca_id_with_vim_id(self): + self.vim_accounts_collection.find_one.return_value = {"vca": "vca_id"} + vca_id = self.loop.run_until_complete(self.store.get_vca_id("vim_id")) + self.vim_accounts_collection.find_one.assert_called_once_with({"_id": "vim_id"}) + self.assertEqual(vca_id, "vca_id") diff --git a/n2vc/tests/unit/test_utils.py b/n2vc/tests/unit/test_utils.py index c5ab84f..bffbc29 100644 --- a/n2vc/tests/unit/test_utils.py +++ b/n2vc/tests/unit/test_utils.py @@ -14,7 +14,7 @@ from unittest import TestCase -from n2vc.utils import Dict, EntityType, JujuStatusToOSM, N2VCDeploymentStatus, DB_DATA +from n2vc.utils import Dict, EntityType, JujuStatusToOSM, N2VCDeploymentStatus from juju.machine import Machine from juju.application import Application from juju.action import Action @@ -84,8 +84,3 @@ class UtilsTest(TestCase): osm_status = status["osm"] self.assertTrue(juju_status in JujuStatusToOSM[entity_type]) self.assertEqual(osm_status, JujuStatusToOSM[entity_type][juju_status]) - - def test_db_data(self): - self.assertEqual(DB_DATA.api_endpoints.table, "admin") - self.assertEqual(DB_DATA.api_endpoints.filter, {"_id": "juju"}) - self.assertEqual(DB_DATA.api_endpoints.key, "api_endpoints") diff --git a/n2vc/tests/unit/utils.py b/n2vc/tests/unit/utils.py index a727072..2f107a7 100644 --- a/n2vc/tests/unit/utils.py +++ b/n2vc/tests/unit/utils.py @@ -82,7 +82,18 @@ class FakeN2VC(MagicMock): detailed_status: str, vca_status: str, entity_type: str, + vca_id: str = None, ): + """ + Write application status to database + + :param: db_dict: DB dictionary + :param: status: Status of the application + :param: detailed_status: Detailed status + :param: vca_status: VCA status + :param: entity_type: Entity type ("application", "machine, and "action") + :param: vca_id: Id of the VCA. If None, the default VCA will be used. + """ self.last_written_values = Dict( { "n2vc_status": status, diff --git a/n2vc/utils.py b/n2vc/utils.py index 6e0f2c0..f0146a0 100644 --- a/n2vc/utils.py +++ b/n2vc/utils.py @@ -114,14 +114,6 @@ JujuStatusToOSM = { }, } -DB_DATA = Dict( - { - "api_endpoints": Dict( - {"table": "admin", "filter": {"_id": "juju"}, "key": "api_endpoints"} - ) - } -) - def obj_to_yaml(obj: object) -> str: """ diff --git a/n2vc/vca/__init__.py b/n2vc/vca/__init__.py new file mode 100644 index 0000000..aa5cee8 --- /dev/null +++ b/n2vc/vca/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/n2vc/vca/cloud.py b/n2vc/vca/cloud.py new file mode 100644 index 0000000..970fd93 --- /dev/null +++ b/n2vc/vca/cloud.py @@ -0,0 +1,25 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Cloud: + def __init__(self, name: str, credential_name: str): + """ + Constructor + + :param: name: Name of the Cloud + :param: credential_name: Credential name for the Cloud + """ + self.name = name + self.credential_name = credential_name diff --git a/n2vc/vca/connection.py b/n2vc/vca/connection.py new file mode 100644 index 0000000..98de0ff --- /dev/null +++ b/n2vc/vca/connection.py @@ -0,0 +1,113 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +from n2vc.config import EnvironConfig, ModelConfig +from n2vc.store import Store +from n2vc.vca.cloud import Cloud +from n2vc.vca.connection_data import ConnectionData + + +class Connection: + def __init__(self, store: Store, vca_id: str = None): + """ + Contructor + + :param: store: Store object. Used to communicate wuth the DB + :param: vca_id: Id of the VCA. If none specified, the default VCA will be used. + """ + self._data = None + self.default = vca_id is None + self._vca_id = vca_id + self._store = store + + async def load(self): + """Load VCA connection data""" + await self._load_vca_connection_data() + + @property + def is_default(self): + return self._vca_id is None + + @property + def data(self) -> ConnectionData: + return self._data + + async def _load_vca_connection_data(self) -> typing.NoReturn: + """ + Load VCA connection data + + If self._vca_id is None, it will get the VCA data from the Environment variables, + and the default VCA will be used. If it is not None, then it means that it will + load the credentials from the database (A non-default VCA will be used). + """ + if self._vca_id: + self._data = await self._store.get_vca_connection_data(self._vca_id) + else: + envs = EnvironConfig() + # Get endpoints from the DB and ENV. Check if update in the database is needed or not. + db_endpoints = await self._store.get_vca_endpoints() + env_endpoints = ( + envs["endpoints"].split(",") + if "endpoints" in envs + else ["{}:{}".format(envs["host"], envs.get("port", 17070))] + ) + + db_update_needed = not all(e in db_endpoints for e in env_endpoints) + + endpoints = env_endpoints if db_update_needed else db_endpoints + config = { + "endpoints": endpoints, + "user": envs["user"], + "secret": envs["secret"], + "cacert": envs["cacert"], + "pubkey": envs["pubkey"], + "lxd-cloud": envs["cloud"], + "lxd-credentials": envs.get("credentials", envs["cloud"]), + "k8s-cloud": envs["k8s_cloud"], + "k8s-credentials": envs.get("k8s_credentials", envs["k8s_cloud"]), + "model-config": ModelConfig(envs), + "api-proxy": envs.get("api_proxy", None), + } + self._data = ConnectionData(**config) + if db_update_needed: + await self.update_endpoints(endpoints) + + @property + def endpoints(self): + return self._data.endpoints + + async def update_endpoints(self, endpoints: typing.List[str]): + await self._store.update_vca_endpoints(endpoints, self._vca_id) + self._data.endpoints = endpoints + + @property + def lxd_cloud(self) -> Cloud: + return Cloud(self.data.lxd_cloud, self.data.lxd_credentials) + + @property + def k8s_cloud(self) -> Cloud: + return Cloud(self.data.k8s_cloud, self.data.k8s_credentials) + + +async def get_connection(store: Store, vca_id: str = None) -> Connection: + """ + Get Connection + + Method to get a Connection object with the VCA information loaded + """ + connection = Connection(store, vca_id=vca_id) + await connection.load() + return connection diff --git a/n2vc/vca/connection_data.py b/n2vc/vca/connection_data.py new file mode 100644 index 0000000..a1eff21 --- /dev/null +++ b/n2vc/vca/connection_data.py @@ -0,0 +1,51 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from n2vc.utils import base64_to_cacert + + +class ConnectionData: + def __init__(self, **kwargs): + """ + Constructor + + :param: kwargs: + endpoints (list): Endpoints of all the Juju controller units + user (str): Username for authenticating to the controller + secret (str): Secret for authenticating to the controller + cacert (str): Base64 encoded CA certificate for authenticating to the controller + (optional) pubkey (str): Public key to insert to the charm. + This is useful to do `juju ssh`. + It is not very useful though. + TODO: Test it. + (optional) lxd-cloud (str): Name of the cloud to use for lxd proxy charms + (optional) lxd-credentials (str): Name of the lxd-cloud credentials + (optional) k8s-cloud (str): Name of the cloud to use for k8s proxy charms + (optional) k8s-credentials (str): Name of the k8s-cloud credentials + (optional) model-config (n2vc.config.ModelConfig): Config to apply in all Juju models + (deprecated, optional) api-proxy (str): Proxy IP to reach the controller. + Used in case native charms cannot react the controller. + """ + self.endpoints = kwargs["endpoints"] + self.user = kwargs["user"] + self.secret = kwargs["secret"] + self.cacert = base64_to_cacert(kwargs["cacert"]) + self.pubkey = kwargs.get("pubkey", "") + self.lxd_cloud = kwargs.get("lxd-cloud", None) + self.lxd_credentials = kwargs.get("lxd-credentials", None) + self.k8s_cloud = kwargs.get("k8s-cloud", None) + self.k8s_credentials = kwargs.get("k8s-credentials", None) + self.model_config = kwargs.get("model-config", {}) + self.model_config.update({"authorized-keys": self.pubkey}) + self.api_proxy = kwargs.get("api-proxy", None) diff --git a/requirements.in b/requirements.in index ea82d48..eb8534a 100644 --- a/requirements.in +++ b/requirements.in @@ -14,4 +14,6 @@ juju==2.8.4 kubernetes==10.0.1 -pyasn1 \ No newline at end of file +pyasn1 +motor==1.3.1 +retrying-async diff --git a/requirements.txt b/requirements.txt index 8ff8c08..2e394a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ +async-timeout==3.0.1 + # via retrying-async bcrypt==3.2.0 # via paramiko cachetools==4.2.1 @@ -29,6 +31,8 @@ macaroonbakery==1.3.1 # via # juju # theblues +motor==1.3.1 + # via -r requirements.in mypy-extensions==0.4.3 # via typing-inspect oauthlib==3.1.0 @@ -49,6 +53,8 @@ pycparser==2.20 # via cffi pymacaroons==0.13.0 # via macaroonbakery +pymongo==3.11.3 + # via motor pynacl==1.4.0 # via # macaroonbakery @@ -75,6 +81,8 @@ requests==2.25.1 # macaroonbakery # requests-oauthlib # theblues +retrying-async==1.2.0 + # via -r requirements.in rsa==4.7.2 # via google-auth six==1.15.0