# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+import typing
+
+
+class EnvironConfig(dict):
+ prefixes = ["OSMLCM_VCA_", "OSMMON_VCA_"]
+
+ def __init__(self, prefixes: typing.List[str] = None):
+ if prefixes:
+ self.prefixes = prefixes
+ for key, value in os.environ.items():
+ if any(key.startswith(prefix) for prefix in self.prefixes):
+ self.__setitem__(self._get_renamed_key(key), value)
+
+ def _get_renamed_key(self, key: str) -> str:
+ for prefix in self.prefixes:
+ key = key.replace(prefix, "")
+ return key.lower()
+
+
MODEL_CONFIG_KEYS = [
"agent-metadata-url",
"agent-stream",
class JujuModelWatcher:
@staticmethod
- async def wait_for_model(model: Model, timeout: float = 3600):
+ async def wait_for_model(
+ model: Model,
+ timeout: float = 3600
+ ):
"""
Wait for all entities in model to reach its final state.
total_timeout: float = 3600,
db_dict: dict = None,
n2vc: N2VCConnector = None,
+ vca_id: str = None,
):
"""
Wait for entity to reach its final state.
:param: total_timeout: Timeout for the entity to be active
:param: db_dict: Dictionary with data of the DB to write the updates
:param: n2vc: N2VC Connector objector
+ :param: vca_id: VCA ID
:raises: asyncio.TimeoutError when timeout reaches
"""
timeout=progress_timeout,
db_dict=db_dict,
n2vc=n2vc,
+ vca_id=vca_id,
)
)
timeout: float,
db_dict: dict = None,
n2vc: N2VCConnector = None,
+ vca_id: str = None,
):
"""
Observes the changes related to an specific entity in a model
:param: timeout: Maximum time between two updates in the model
:param: db_dict: Dictionary with data of the DB to write the updates
:param: n2vc: N2VC Connector objector
+ :param: vca_id: VCA ID
:raises: asyncio.TimeoutError when timeout reaches
"""
detailed_status=status_message,
vca_status=vca_status,
entity_type=delta_entity,
+ vca_id=vca_id,
)
# Check if timeout
if time.time() > timeout_end:
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
+ **kwargs,
):
+ """Install a helm chart
+
+ :param cluster_uuid str: The UUID of the cluster to install to
+ :param kdu_model str: The name or path of a bundle to install
+ :param kdu_instance: Kdu instance name
+ :param atomic bool: If set, waits until the model is active and resets
+ the cluster on failure.
+ :param timeout int: The time, in seconds, to wait for the install
+ to finish
+ :param params dict: Key-value pairs of instantiation parameters
+ :param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
+
+ :param kwargs: Additional parameters (None yet)
+
+ :return: True if successful
+ """
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
return namespace, cluster_id
async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
+ self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts
:param namespace: optional namespace to be used for helm. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param kwargs: Additional parameters (None yet)
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
self.fs.reverse_sync(from_path=cluster_id)
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
) -> bool:
+ """Reset a cluster
+ Resets the Kubernetes cluster by removing the helm deployment that represents it.
+
+ :param cluster_uuid: The UUID of the cluster to reset
+ :param force: Boolean to force the reset
+ :param uninstall_sw: Boolean to force the reset
+ :param kwargs: Additional parameters (None yet)
+ :return: Returns True if successful or raises an exception.
+ """
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
.format(cluster_id, uninstall_sw))
else:
return 0
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
"""
Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
(this call should happen after all _terminate-config-primitive_ of the VNF
:param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
+ :param kwargs: Additional parameters (None yet)
:return: True if successful
"""
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
:db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters (None yet)
:return: Returns the output of the action
"""
return service
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
+ """
+ This call would retrieve tha current state of a given KDU instance. It would be
+ would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
+ values_ of the configuration parameters applied to a given instance. This call
+ would be based on the `status` call.
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_instance: unique name for the KDU instance
+ :param kwargs: Additional parameters (None yet)
+ :return: If successful, it will return the following vector of arguments:
+ - K8s `namespace` in the cluster where the KDU lives
+ - `state` of the KDU instance. It can be:
+ - UNKNOWN
+ - DEPLOYED
+ - DELETED
+ - SUPERSEDED
+ - FAILED or
+ - DELETING
+ - List of `resources` (objects) that this release consists of, sorted by kind,
+ and the status of those resources
+ - Last `deployment_time`.
+
+ """
self.log.debug(
"status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
cluster_uuid, kdu_instance
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
+ **kwargs,
):
+ """
+ Deploys of a new KDU instance. It would implicitly rely on the `install` call
+ to deploy the Chart/Bundle properly parametrized (in practice, this call would
+ happen before any _initial-config-primitive_of the VNF is called).
+
+ :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param kdu_model: chart/ reference (string), which can be either
+ of these options:
+ - a name of chart available via the repos known by OSM
+ - a path to a packaged chart
+ - a path to an unpacked chart directory or a URL
+ :param kdu_instance: Kdu instance name
+ :param atomic: If set, installation process purges chart/bundle on fail, also
+ will wait until all the K8s objects are active
+ :param timeout: Time in seconds to wait for the install of the chart/bundle
+ (defaults to Helm default timeout: 300s)
+ :param params: dictionary of key-value pairs for instantiation parameters
+ (overriding default values)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with {collection: <str>, filter: {},
+ path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
+ :param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
+ :param kwargs: Additional parameters (None yet)
+ :return: True if successful
+ """
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
import binascii
import base64
-from n2vc.config import ModelConfig
-from n2vc.exceptions import K8sException, N2VCBadArgumentsException
+from n2vc.config import EnvironConfig
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
from .exceptions import MethodNotImplemented
-from n2vc.utils import base64_to_cacert
from n2vc.libjuju import Libjuju
from n2vc.utils import obj_to_dict, obj_to_yaml
-
+from n2vc.store import MotorStore
+from n2vc.vca.cloud import Cloud
+from n2vc.vca.connection import get_connection
from kubernetes.client.models import (
V1ClusterRole,
V1ObjectMeta,
class K8sJujuConnector(K8sConnector):
+ libjuju = None
+
def __init__(
self,
fs: object,
log: object = None,
loop: object = None,
on_update_db=None,
- vca_config: dict = None,
):
"""
:param fs: file system for kubernetes and helm configuration
self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
- required_vca_config = [
- "host",
- "user",
- "secret",
- "ca_cert",
- ]
- if not vca_config or not all(k in vca_config for k in required_vca_config):
- raise N2VCBadArgumentsException(
- message="Missing arguments in vca_config: {}".format(vca_config),
- bad_args=required_vca_config,
- )
- port = vca_config["port"] if "port" in vca_config else 17070
- url = "{}:{}".format(vca_config["host"], port)
- model_config = ModelConfig(vca_config)
- username = vca_config["user"]
- secret = vca_config["secret"]
- ca_cert = base64_to_cacert(vca_config["ca_cert"])
-
- self.libjuju = Libjuju(
- endpoint=url,
- api_proxy=None, # Not needed for k8s charms
- model_config=model_config,
- username=username,
- password=secret,
- cacert=ca_cert,
- loop=self.loop,
- log=self.log,
- db=self.db,
- )
+ db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
+ self._store = MotorStore(db_uri)
+ self.loading_libjuju = asyncio.Lock(loop=self.loop)
+
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
# self.authenticated = False
k8s_creds: str,
namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
+ **kwargs,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
:param namespace: optional namespace to be used for juju. By default,
'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: uuid of the K8s cluster and True if connector has installed some
software in the cluster
(on error, an exception will be raised)
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
)
default_storage_class = kubectl.get_default_storage_class()
- await self.libjuju.add_k8s(
+ await libjuju.add_k8s(
name=cluster_uuid,
rbac_id=rbac_id,
token=token,
"""Reset"""
async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False,
+ **kwargs,
) -> bool:
"""Reset a cluster
Resets the Kubernetes cluster by removing the model that represents it.
:param cluster_uuid str: The UUID of the cluster to reset
+ :param force: Force reset
+ :param uninstall_sw: Boolean to uninstall sw
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
+
:return: Returns True if successful or raises an exception.
"""
try:
self.log.debug("[reset] Removing k8s cloud")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- cloud_creds = await self.libjuju.get_cloud_credentials(
- cluster_uuid,
- self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+
+ cloud_creds = await libjuju.get_cloud_credentials(cloud)
- await self.libjuju.remove_cloud(cluster_uuid)
+ await libjuju.remove_cloud(cluster_uuid)
kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
db_dict: dict = None,
kdu_name: str = None,
namespace: str = None,
+ **kwargs,
) -> bool:
"""Install a bundle
:param params dict: Key-value pairs of instantiation parameters
:param kdu_name: Name of the KDU instance to be installed
:param namespace: K8s namespace to use for the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: If successful, returns ?
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
bundle = kdu_model
if not db_dict:
# Create the new model
self.log.debug("Adding model: {}".format(kdu_instance))
- await self.libjuju.add_model(
- model_name=kdu_instance,
- cloud_name=cluster_uuid,
- credential_name=self._get_credential_name(cluster_uuid),
- )
+ cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
+ await libjuju.add_model(kdu_instance, cloud)
# if model:
# TODO: Instantiation parameters
previous_workdir = "/app/storage"
self.log.debug("[install] deploying {}".format(bundle))
- await self.libjuju.deploy(
+ await libjuju.deploy(
bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
)
os.chdir(previous_workdir)
if self.on_update_db:
- await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
+ await self.on_update_db(
+ cluster_uuid,
+ kdu_instance,
+ filter=db_dict["filter"],
+ vca_id=kwargs.get("vca_id")
+ )
return True
async def instances_list(self, cluster_uuid: str) -> list:
"""Deletion"""
- async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
+ async def uninstall(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ **kwargs,
+ ) -> bool:
"""Uninstall a KDU instance
:param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns True if successful, or raises an exception
"""
self.log.debug("[uninstall] Destroying model")
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
+ await libjuju.destroy_model(kdu_instance, total_timeout=3600)
# self.log.debug("[uninstall] Model destroyed and disconnecting")
# await controller.disconnect()
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
+ **kwargs,
) -> str:
"""Exec primitive (Juju action)
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
:param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
+ :param db_dict: Dictionary for any additional data
+ :param kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns the output of the action
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
if not params or "application-name" not in params:
raise K8sException(
"kdu_instance: {}".format(kdu_instance)
)
application_name = params["application-name"]
- actions = await self.libjuju.get_actions(application_name, kdu_instance)
+ actions = await libjuju.get_actions(application_name, kdu_instance)
if primitive_name not in actions:
raise K8sException("Primitive {} not found".format(primitive_name))
- output, status = await self.libjuju.execute_action(
+ output, status = await libjuju.execute_action(
application_name, kdu_instance, primitive_name, **params
)
cluster_uuid: str,
kdu_instance: str,
complete_status: bool = False,
- yaml_format: bool = False
+ yaml_format: bool = False,
+ **kwargs,
) -> dict:
"""Get the status of the KDU
:param kdu_instance str: The unique id of the KDU instance
:param complete_status: To get the complete_status of the KDU
:param yaml_format: To get the status in proper format for NSR record
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: Returns a dictionary containing namespace, state, resources,
and deployment_time and returns complete_status if complete_status is True
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
status = {}
- model_status = await self.libjuju.get_model_status(kdu_instance)
+ model_status = await libjuju.get_model_status(kdu_instance)
if not complete_status:
for name in model_status.applications:
return status
- async def update_vca_status(self, vcastatus: dict, kdu_instance: str):
+ async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
"""
Add all configs, actions, executed actions of all applications in a model to vcastatus dict
:param vcastatus dict: dict containing vcastatus
:param kdu_instance str: The unique id of the KDU instance
+ :param: kwargs: Additional parameters
+ vca_id (str): VCA ID
:return: None
"""
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
try:
for model_name in vcastatus:
# Adding executed actions
vcastatus[model_name]["executedActions"] = \
- await self.libjuju.get_executed_actions(kdu_instance)
+ await libjuju.get_executed_actions(kdu_instance)
for application in vcastatus[model_name]["applications"]:
# Adding application actions
vcastatus[model_name]["applications"][application]["actions"] = \
- await self.libjuju.get_actions(application, kdu_instance)
+ await libjuju.get_actions(application, kdu_instance)
# Adding application configs
vcastatus[model_name]["applications"][application]["configs"] = \
- await self.libjuju.get_application_configs(kdu_instance, application)
+ await libjuju.get_application_configs(kdu_instance, application)
except Exception as e:
self.log.debug("Error in updating vca status: {}".format(str(e)))
else:
kdu_instance = db_dict["filter"]["_id"]
return kdu_instance
+
+ async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
+ """
+ Get libjuju object
+
+ :param: vca_id: VCA ID
+ If None, get a libjuju object with a Connection to the default VCA
+ Else, geta libjuju object with a Connection to the specified VCA
+ """
+ if not vca_id:
+ while self.loading_libjuju.locked():
+ await asyncio.sleep(0.1)
+ if not self.libjuju:
+ async with self.loading_libjuju:
+ vca_connection = await get_connection(self._store)
+ self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ return self.libjuju
+ else:
+ vca_connection = await get_connection(self._store, vca_id)
+ return Libjuju(
+ vca_connection,
+ loop=self.loop,
+ log=self.log,
+ n2vc=self,
+ )
import asyncio
import logging
+import typing
import time
from juju.client import client
from juju import tag
-from n2vc.config import ModelConfig
from n2vc.juju_watcher import JujuModelWatcher
from n2vc.provisioner import AsyncSSHProvisioner
from n2vc.n2vc_conn import N2VCConnector
JujuControllerFailedConnecting,
JujuApplicationExists,
JujuInvalidK8sConfiguration,
- JujuError
+ JujuError,
)
-from n2vc.utils import DB_DATA
-from osm_common.dbbase import DbException
+from n2vc.vca.cloud import Cloud as VcaCloud
+from n2vc.vca.connection import Connection
from kubernetes.client.configuration import Configuration
+from retrying_async import retry
+
RBAC_LABEL_KEY_NAME = "rbac-id"
class Libjuju:
def __init__(
self,
- endpoint: str,
- api_proxy: str,
- username: str,
- password: str,
- cacert: str,
+ vca_connection: Connection,
loop: asyncio.AbstractEventLoop = None,
log: logging.Logger = None,
- db: dict = None,
n2vc: N2VCConnector = None,
- model_config: ModelConfig = {},
):
"""
Constructor
- :param: endpoint: Endpoint of the juju controller (host:port)
- :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
- :param: username: Juju username
- :param: password: Juju password
- :param: cacert: Juju CA Certificate
+ :param: vca_connection: n2vc.vca.connection object
:param: loop: Asyncio loop
:param: log: Logger
- :param: db: DB object
:param: n2vc: N2VC object
- :param: apt_mirror: APT Mirror
- :param: enable_os_upgrade: Enable OS Upgrade
"""
self.log = log or logging.getLogger("Libjuju")
- self.db = db
- db_endpoints = self._get_api_endpoints_db()
- self.endpoints = None
- if (db_endpoints and endpoint not in db_endpoints) or not db_endpoints:
- self.endpoints = [endpoint]
- self._update_api_endpoints_db(self.endpoints)
- else:
- self.endpoints = db_endpoints
- self.api_proxy = api_proxy
- self.username = username
- self.password = password
- self.cacert = cacert
- self.loop = loop or asyncio.get_event_loop()
self.n2vc = n2vc
+ self.vca_connection = vca_connection
- # Generate config for models
- self.model_config = model_config
-
+ self.loop = loop or asyncio.get_event_loop()
self.loop.set_exception_handler(self.handle_exception)
self.creating_model = asyncio.Lock(loop=self.loop)
- self.log.debug("Libjuju initialized!")
-
- self.health_check_task = self._create_health_check_task()
+ if self.vca_connection.is_default:
+ self.health_check_task = self._create_health_check_task()
def _create_health_check_task(self):
return self.loop.create_task(self.health_check())
- async def get_controller(self, timeout: float = 15.0) -> Controller:
+ async def get_controller(self, timeout: float = 60.0) -> Controller:
"""
Get controller
controller = Controller(loop=self.loop)
await asyncio.wait_for(
controller.connect(
- endpoint=self.endpoints,
- username=self.username,
- password=self.password,
- cacert=self.cacert,
+ endpoint=self.vca_connection.data.endpoints,
+ username=self.vca_connection.data.user,
+ password=self.vca_connection.data.secret,
+ cacert=self.vca_connection.data.cacert,
),
timeout=timeout,
)
- endpoints = await controller.api_endpoints
- if self.endpoints != endpoints:
- self.endpoints = endpoints
- self._update_api_endpoints_db(self.endpoints)
+ if self.vca_connection.is_default:
+ endpoints = await controller.api_endpoints
+ if not all(
+ endpoint in self.vca_connection.endpoints for endpoint in endpoints
+ ):
+ await self.vca_connection.update_endpoints(endpoints)
return controller
except asyncio.CancelledError as e:
raise e
except Exception as e:
self.log.error(
- "Failed connecting to controller: {}...".format(self.endpoints)
+ "Failed connecting to controller: {}... {}".format(
+ self.vca_connection.data.endpoints, e
+ )
)
if controller:
await self.disconnect_controller(controller)
if controller:
await controller.disconnect()
- async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
+ @retry(attempts=3, delay=5, timeout=None)
+ async def add_model(self, model_name: str, cloud: VcaCloud):
"""
Create model
:param: model_name: Model name
- :param: cloud_name: Cloud name
- :param: credential_name: Credential name to use for adding the model
- If not specified, same name as the cloud will be used.
+ :param: cloud: Cloud object
"""
# Get controller
self.log.debug("Creating model {}".format(model_name))
model = await controller.add_model(
model_name,
- config=self.model_config,
- cloud_name=cloud_name,
- credential_name=credential_name or cloud_name,
+ config=self.vca_connection.data.model_config,
+ cloud_name=cloud.name,
+ credential_name=cloud.credential_name,
)
+ except JujuAPIError as e:
+ if "already exists" in e.message:
+ pass
+ else:
+ raise e
finally:
if model:
await self.disconnect_model(model)
actions.update(application_actions)
# Get status of all actions
for application_action in actions:
- app_action_status_list = await model.get_action_status(name=application_action)
+ app_action_status_list = await model.get_action_status(
+ name=application_action
+ )
for action_id, action_status in app_action_status_list.items():
- executed_action = {"id": action_id, "action": application_action,
- "status": action_status}
+ executed_action = {
+ "id": action_id,
+ "action": application_action,
+ "status": action_status,
+ }
# Get action output by id
action_status = await model.get_action_output(executed_action["id"])
for k, v in action_status.items():
executed_action[k] = v
executed_actions.append(executed_action)
except Exception as e:
- raise JujuError("Error in getting executed actions for model: {}. Error: {}"
- .format(model_name, str(e)))
+ raise JujuError(
+ "Error in getting executed actions for model: {}. Error: {}".format(
+ model_name, str(e)
+ )
+ )
finally:
if model:
await self.disconnect_model(model)
await self.disconnect_controller(controller)
return executed_actions
- async def get_application_configs(self, model_name: str, application_name: str) -> dict:
+ async def get_application_configs(
+ self, model_name: str, application_name: str
+ ) -> dict:
"""
Get available configs for an application.
controller = await self.get_controller()
try:
model = await self.get_model(controller, model_name)
- application = self._get_application(model, application_name=application_name)
+ application = self._get_application(
+ model, application_name=application_name
+ )
application_configs = await application.get_config()
except Exception as e:
- raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
- .format(application_name, model_name, str(e)))
+ raise JujuError(
+ "Error in getting configs for application: {} in model: {}. Error: {}".format(
+ application_name, model_name, str(e)
+ )
+ )
finally:
if model:
await self.disconnect_model(model)
await self.disconnect_controller(controller)
return application_configs
- async def get_model(
- self, controller: Controller, model_name: str, id=None
- ) -> Model:
+ @retry(attempts=3, delay=5)
+ async def get_model(self, controller: Controller, model_name: str) -> Model:
"""
Get model from controller
"""
return await controller.get_model(model_name)
- async def model_exists(
- self, model_name: str, controller: Controller = None
- ) -> bool:
+ async def model_exists(self, model_name: str, controller: Controller = None) -> bool:
"""
Check if model exists
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
+ vca_id=self.vca_connection._vca_id,
)
finally:
await self.disconnect_model(model)
connection=connection,
nonce=params.nonce,
machine_id=machine_id,
- proxy=self.api_proxy,
+ proxy=self.vca_connection.data.api_proxy,
series=params.series,
)
)
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
+ vca_id=self.vca_connection._vca_id,
)
except Exception as e:
raise e
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
+ vca_id=self.vca_connection._vca_id,
)
self.log.debug(
"Application {} is ready in model {}".format(
# because the leader elected hook has not been triggered yet.
# Therefore, we are doing some retries. If it happens again,
# re-open bug 1236
- attempts = 3
- time_between_retries = 10
- unit = None
- for _ in range(attempts):
- unit = await self._get_leader_unit(application)
- if unit is None:
- await asyncio.sleep(time_between_retries)
- else:
- break
- if unit is None:
- raise JujuLeaderUnitNotFound(
- "Cannot execute action: leader unit not found"
- )
+ unit = await self._get_leader_unit(application)
actions = await application.get_actions()
total_timeout=total_timeout,
db_dict=db_dict,
n2vc=self.n2vc,
+ vca_id=self.vca_connection._vca_id,
)
output = await model.get_action_output(action_uuid=action.entity_id)
await self.disconnect_model(model)
await self.disconnect_controller(controller)
- def _get_api_endpoints_db(self) -> [str]:
- """
- Get API Endpoints from DB
-
- :return: List of API endpoints
- """
- self.log.debug("Getting endpoints from database")
-
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- if juju_info and DB_DATA.api_endpoints.key in juju_info:
- return juju_info[DB_DATA.api_endpoints.key]
-
- def _update_api_endpoints_db(self, endpoints: [str]):
- """
- Update API endpoints in Database
-
- :param: List of endpoints
- """
- self.log.debug("Saving endpoints {} in database".format(endpoints))
-
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- # If it doesn't, then create it
- if not juju_info:
- try:
- self.db.create(
- DB_DATA.api_endpoints.table,
- DB_DATA.api_endpoints.filter,
- )
- except DbException as e:
- # Racing condition: check if another N2VC worker has created it
- juju_info = self.db.get_one(
- DB_DATA.api_endpoints.table,
- q_filter=DB_DATA.api_endpoints.filter,
- fail_on_empty=False,
- )
- if not juju_info:
- raise e
- self.db.set_one(
- DB_DATA.api_endpoints.table,
- DB_DATA.api_endpoints.filter,
- {DB_DATA.api_endpoints.key: endpoints},
- )
-
def handle_exception(self, loop, context):
# All unhandled exceptions by libjuju are handled here.
pass
finally:
await self.disconnect_controller(controller)
+ @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
async def _get_leader_unit(self, application: Application) -> Unit:
unit = None
for u in application.units:
if await u.is_leader_from_status():
unit = u
break
+ if not unit:
+ raise Exception()
return unit
- async def get_cloud_credentials(self, cloud_name: str, credential_name: str):
+ async def get_cloud_credentials(self, cloud: Cloud) -> typing.List:
+ """
+ Get cloud credentials
+
+ :param: cloud: Cloud object. The returned credentials will be from this cloud.
+
+ :return: List of credentials object associated to the specified cloud
+
+ """
controller = await self.get_controller()
try:
facade = client.CloudFacade.from_connection(controller.connection())
- cloud_cred_tag = tag.credential(cloud_name, self.username, credential_name)
+ cloud_cred_tag = tag.credential(
+ cloud.name, self.vca_connection.data.user, cloud.credential_name
+ )
params = [client.Entity(cloud_cred_tag)]
return (await facade.Credential(params)).results
finally:
fs: object,
log: object,
loop: object,
- url: str,
- username: str,
- vca_config: dict,
on_update_db=None,
**kwargs,
):
FsBase)
:param object log: the logging object to log to
:param object loop: the loop to use for asyncio (default current thread loop)
- :param str url: a string that how to connect to the VCA (if needed, IP and port
- can be obtained from there)
- :param str username: the username to authenticate with VCA
- :param dict vca_config: Additional parameters for the specific VCA. For example,
- for juju it will contain:
- secret: The password to authenticate with
- public_key: The contents of the juju public SSH key
- ca_cert str: The CA certificate used to authenticate
:param on_update_db: callback called when n2vc connector updates database.
Received arguments:
table: e.g. "nsrs"
if fs is None:
raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
- self.log.info(
- "url={}, username={}, vca_config={}".format(
- url,
- username,
- {
- k: v
- for k, v in vca_config.items()
- if k
- not in ("host", "port", "user", "secret", "public_key", "ca_cert")
- },
- )
- )
-
# store arguments into self
self.db = db
self.fs = fs
self.loop = loop or asyncio.get_event_loop()
- self.url = url
- self.username = username
- self.vca_config = vca_config
self.on_update_db = on_update_db
# generate private/public key-pair
detailed_status: str,
vca_status: str,
entity_type: str,
+ vca_id: str = None,
):
+ """
+ Write application status to database
+
+ :param: db_dict: DB dictionary
+ :param: status: Status of the application
+ :param: detailed_status: Detailed status
+ :param: vca_status: VCA status
+ :param: entity_type: Entity type ("application", "machine, and "action")
+ :param: vca_id: Id of the VCA. If None, the default VCA will be used.
+ """
if not db_dict:
self.log.debug("No db_dict => No database write")
return
if self.on_update_db:
if asyncio.iscoroutinefunction(self.on_update_db):
await self.on_update_db(
- the_table, the_filter, the_path, update_dict
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
)
else:
- self.on_update_db(the_table, the_filter, the_path, update_dict)
+ self.on_update_db(the_table, the_filter, the_path, update_dict, vca_id=vca_id)
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
import asyncio
import logging
-import os
-from n2vc.config import ModelConfig
+from n2vc.config import EnvironConfig
from n2vc.exceptions import (
N2VCBadArgumentsException,
N2VCException,
N2VCExecutionException,
# N2VCNotFound,
MethodNotImplemented,
- JujuK8sProxycharmNotSupported,
)
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
from n2vc.libjuju import Libjuju
-from n2vc.utils import base64_to_cacert
+from n2vc.store import MotorStore
+from n2vc.vca.connection import get_connection
class N2VCJujuConnector(N2VCConnector):
"""
-####################################################################################
-################################### P U B L I C ####################################
-####################################################################################
+ ####################################################################################
+ ################################### P U B L I C ####################################
+ ####################################################################################
"""
BUILT_IN_CLOUDS = ["localhost", "microk8s"]
+ libjuju = None
def __init__(
self,
fs: object,
log: object = None,
loop: object = None,
- url: str = "127.0.0.1:17070",
- username: str = "admin",
- vca_config: dict = None,
on_update_db=None,
):
- """Initialize juju N2VC connector
+ """
+ Constructor
+
+ :param: db: Database object from osm_common
+ :param: fs: Filesystem object from osm_common
+ :param: log: Logger
+ :param: loop: Asyncio loop
+ :param: on_update_db: Callback function to be called for updating the database.
"""
# parent class constructor
fs=fs,
log=log,
loop=loop,
- url=url,
- username=username,
- vca_config=vca_config,
on_update_db=on_update_db,
)
self.log.info("Initializing N2VC juju connector...")
- """
- ##############################################################
- # check arguments
- ##############################################################
- """
-
- # juju URL
- if url is None:
- raise N2VCBadArgumentsException("Argument url is mandatory", ["url"])
- url_parts = url.split(":")
- if len(url_parts) != 2:
- raise N2VCBadArgumentsException(
- "Argument url: bad format (localhost:port) -> {}".format(url), ["url"]
- )
- self.hostname = url_parts[0]
- try:
- self.port = int(url_parts[1])
- except ValueError:
- raise N2VCBadArgumentsException(
- "url port must be a number -> {}".format(url), ["url"]
- )
-
- # juju USERNAME
- if username is None:
- raise N2VCBadArgumentsException(
- "Argument username is mandatory", ["username"]
- )
-
- # juju CONFIGURATION
- if vca_config is None:
- raise N2VCBadArgumentsException(
- "Argument vca_config is mandatory", ["vca_config"]
- )
-
- if "secret" in vca_config:
- self.secret = vca_config["secret"]
- else:
- raise N2VCBadArgumentsException(
- "Argument vca_config.secret is mandatory", ["vca_config.secret"]
- )
-
- # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
- # if exists, it will be written in lcm container: _create_juju_public_key()
- if "public_key" in vca_config:
- self.public_key = vca_config["public_key"]
- else:
- self.public_key = None
-
- # TODO: Verify ca_cert is valid before using. VCA will crash
- # if the ca_cert isn't formatted correctly.
-
- self.ca_cert = vca_config.get("ca_cert")
- if self.ca_cert:
- self.ca_cert = base64_to_cacert(vca_config["ca_cert"])
-
- if "api_proxy" in vca_config and vca_config["api_proxy"] != "":
- self.api_proxy = vca_config["api_proxy"]
- self.log.debug(
- "api_proxy for native charms configured: {}".format(self.api_proxy)
- )
- else:
- self.warning(
- "api_proxy is not configured"
- )
- self.api_proxy = None
-
- model_config = ModelConfig(vca_config)
-
- self.cloud = vca_config.get('cloud')
- self.k8s_cloud = None
- if "k8s_cloud" in vca_config:
- self.k8s_cloud = vca_config.get("k8s_cloud")
- self.log.debug('Arguments have been checked')
-
- # juju data
- self.controller = None # it will be filled when connect to juju
- self.juju_models = {} # model objects for every model_name
- self.juju_observers = {} # model observers for every model_name
- self._connecting = (
- False # while connecting to juju (to avoid duplicate connections)
- )
- self._authenticated = (
- False # it will be True when juju connection be stablished
- )
- self._creating_model = False # True during model creation
- self.libjuju = Libjuju(
- endpoint=self.url,
- api_proxy=self.api_proxy,
- username=self.username,
- password=self.secret,
- cacert=self.ca_cert,
- loop=self.loop,
- log=self.log,
- db=self.db,
- n2vc=self,
- model_config=model_config,
- )
-
- # create juju pub key file in lcm container at
- # ./local/share/juju/ssh/juju_id_rsa.pub
- self._create_juju_public_key()
+ db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
+ self._store = MotorStore(db_uri)
+ self.loading_libjuju = asyncio.Lock(loop=self.loop)
self.log.info("N2VC juju connector initialized")
- async def get_status(self, namespace: str, yaml_format: bool = True):
+ async def get_status(
+ self, namespace: str, yaml_format: bool = True, vca_id: str = None
+ ):
+ """
+ Get status from all juju models from a VCA
+
+ :param namespace: we obtain ns from namespace
+ :param yaml_format: returns a yaml string
+ :param: vca_id: VCA ID from which the status will be retrieved.
+ """
+ # TODO: Review where is this function used. It is not optimal at all to get the status
+ # from all the juju models of a particular VCA. Additionally, these models might
+ # not have been deployed by OSM, in that case we are getting information from
+ # deployments outside of OSM's scope.
# self.log.info('Getting NS status. namespace: {}'.format(namespace))
+ libjuju = await self._get_libjuju(vca_id)
_nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components(
namespace=namespace
raise N2VCBadArgumentsException(msg, ["namespace"])
status = {}
- models = await self.libjuju.list_models(contains=ns_id)
+ models = await libjuju.list_models(contains=ns_id)
for m in models:
- status[m] = await self.libjuju.get_model_status(m)
+ status[m] = await libjuju.get_model_status(m)
+
if yaml_format:
return obj_to_yaml(status)
else:
return obj_to_dict(status)
- async def update_vca_status(self, vcastatus: dict):
+ async def update_vca_status(self, vcastatus: dict, vca_id: str = None):
"""
Add all configs, actions, executed actions of all applications in a model to vcastatus dict.
+
:param vcastatus: dict containing vcaStatus
+ :param: vca_id: VCA ID
+
:return: None
"""
try:
+ libjuju = await self._get_libjuju(vca_id)
for model_name in vcastatus:
# Adding executed actions
vcastatus[model_name]["executedActions"] = \
- await self.libjuju.get_executed_actions(model_name)
+ await libjuju.get_executed_actions(model_name)
for application in vcastatus[model_name]["applications"]:
# Adding application actions
vcastatus[model_name]["applications"][application]["actions"] = \
- await self.libjuju.get_actions(application, model_name)
+ await libjuju.get_actions(application, model_name)
# Adding application configs
vcastatus[model_name]["applications"][application]["configs"] = \
- await self.libjuju.get_application_configs(model_name, application)
+ await libjuju.get_application_configs(model_name, application)
except Exception as e:
self.log.debug("Error in updating vca status: {}".format(str(e)))
reuse_ee_id: str = None,
progress_timeout: float = None,
total_timeout: float = None,
- cloud_name: str = None,
- credential_name: str = None,
+ vca_id: str = None,
) -> (str, dict):
+ """
+ Create an Execution Environment. Returns when it is created or raises an
+ exception on failing
+
+ :param: namespace: Contains a dot separate string.
+ LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
+ :param: db_dict: where to write to database when the status changes.
+ It contains a dictionary with {collection: str, filter: {}, path: str},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
+ "_admin.deployed.VCA.3"}
+ :param: reuse_ee_id: ee id from an older execution. It allows us to reuse an
+ older environment
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
+ :param: vca_id: VCA ID
+
+ :returns: id of the new execution environment and credentials for it
+ (credentials can contains hostname, username, etc depending on underlying cloud)
+ """
self.log.info(
"Creating execution environment. namespace: {}, reuse_ee_id: {}".format(
namespace, reuse_ee_id
)
)
+ libjuju = await self._get_libjuju(vca_id)
machine_id = None
if reuse_ee_id:
# create or reuse a new juju machine
try:
- if not await self.libjuju.model_exists(model_name):
- cloud = cloud_name or self.cloud
- credential = credential_name or cloud_name if cloud_name else self.cloud
- await self.libjuju.add_model(
+ if not await libjuju.model_exists(model_name):
+ await libjuju.add_model(
model_name,
- cloud_name=cloud,
- credential_name=credential
+ libjuju.vca_connection.lxd_cloud,
)
- machine, new = await self.libjuju.create_machine(
+ machine, new = await libjuju.create_machine(
model_name=model_name,
machine_id=machine_id,
db_dict=db_dict,
db_dict: dict,
progress_timeout: float = None,
total_timeout: float = None,
- cloud_name: str = None,
- credential_name: str = None,
+ vca_id: str = None,
) -> str:
-
+ """
+ Register an existing execution environment at the VCA
+
+ :param: namespace: Contains a dot separate string.
+ LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
+ :param: credentials: credentials to access the existing execution environment
+ (it can contains hostname, username, path to private key,
+ etc depending on underlying cloud)
+ :param: db_dict: where to write to database when the status changes.
+ It contains a dictionary with {collection: str, filter: {}, path: str},
+ e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
+ "_admin.deployed.VCA.3"}
+ :param: reuse_ee_id: ee id from an older execution. It allows us to reuse an
+ older environment
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
+ :param: vca_id: VCA ID
+
+ :returns: id of the execution environment
+ """
self.log.info(
"Registering execution environment. namespace={}, credentials={}".format(
namespace, credentials
)
)
+ libjuju = await self._get_libjuju(vca_id)
if credentials is None:
raise N2VCBadArgumentsException(
# register machine on juju
try:
- if not await self.libjuju.model_exists(model_name):
- cloud = cloud_name or self.cloud
- credential = credential_name or cloud_name if cloud_name else self.cloud
- await self.libjuju.add_model(
+ if not await libjuju.model_exists(model_name):
+ await libjuju.add_model(
model_name,
- cloud_name=cloud,
- credential_name=credential
+ libjuju.vca_connection.lxd_cloud,
)
- machine_id = await self.libjuju.provision_machine(
+ machine_id = await libjuju.provision_machine(
model_name=model_name,
hostname=hostname,
username=username,
total_timeout: float = None,
config: dict = None,
num_units: int = 1,
+ vca_id: str = None,
):
+ """
+ Install the software inside the execution environment identified by ee_id
+
+ :param: ee_id: the id of the execution environment returned by
+ create_execution_environment or register_execution_environment
+ :param: artifact_path: where to locate the artifacts (parent folder) using
+ the self.fs
+ the final artifact path will be a combination of this
+ artifact_path and additional string from the config_dict
+ (e.g. charm name)
+ :param: db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
+ :param: config: Dictionary with deployment config information.
+ :param: num_units: Number of units to deploy of a particular charm.
+ :param: vca_id: VCA ID
+ """
self.log.info(
(
"artifact path: {}, db_dict: {}"
).format(ee_id, artifact_path, db_dict)
)
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if ee_id is None or len(ee_id) == 0:
full_path = self.fs.path + "/" + artifact_path
try:
- await self.libjuju.deploy_charm(
+ await libjuju.deploy_charm(
model_name=model_name,
application_name=application_name,
path=full_path,
progress_timeout: float = None,
total_timeout: float = None,
config: dict = None,
- cloud_name: str = None,
- credential_name: str = None,
+ vca_id: str = None,
) -> str:
"""
Install a k8s proxy charm
{collection: <str>, filter: {}, path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
- :param float progress_timeout:
- :param float total_timeout:
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
:param config: Dictionary with additional configuration
- :param cloud_name: Cloud Name in which the charms will be deployed
- :param credential_name: Credential Name to use in the cloud_name.
- If not set, cloud_name will be used as credential_name
+ :param vca_id: VCA ID
:returns ee_id: execution environment id.
"""
- self.log.info('Installing k8s proxy charm: {}, artifact path: {}, db_dict: {}'
- .format(charm_name, artifact_path, db_dict))
-
- if not self.k8s_cloud:
- raise JujuK8sProxycharmNotSupported("There is not k8s_cloud available")
+ self.log.info(
+ "Installing k8s proxy charm: {}, artifact path: {}, db_dict: {}".format(
+ charm_name, artifact_path, db_dict
+ )
+ )
+ libjuju = await self._get_libjuju(vca_id)
if artifact_path is None or len(artifact_path) == 0:
raise N2VCBadArgumentsException(
message="artifact_path is mandatory", bad_args=["artifact_path"]
)
if db_dict is None:
- raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
+ raise N2VCBadArgumentsException(
+ message="db_dict is mandatory", bad_args=["db_dict"]
+ )
# remove // in charm path
- while artifact_path.find('//') >= 0:
- artifact_path = artifact_path.replace('//', '/')
+ while artifact_path.find("//") >= 0:
+ artifact_path = artifact_path.replace("//", "/")
# check charm path
if not self.fs.file_exists(artifact_path, mode="dir"):
- msg = 'artifact path does not exist: {}'.format(artifact_path)
- raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
+ msg = "artifact path does not exist: {}".format(artifact_path)
+ raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
- if artifact_path.startswith('/'):
+ if artifact_path.startswith("/"):
full_path = self.fs.path + artifact_path
else:
- full_path = self.fs.path + '/' + artifact_path
+ full_path = self.fs.path + "/" + artifact_path
_, ns_id, _, _, _ = self._get_namespace_components(namespace=namespace)
- model_name = '{}-k8s'.format(ns_id)
- if not await self.libjuju.model_exists(model_name):
- cloud = cloud_name or self.k8s_cloud
- credential = credential_name or cloud_name if cloud_name else self.k8s_cloud
- await self.libjuju.add_model(
+ model_name = "{}-k8s".format(ns_id)
+ if not await libjuju.model_exists(model_name):
+ await libjuju.add_model(
model_name,
- cloud_name=cloud,
- credential_name=credential
+ libjuju.vca_connection.k8s_cloud,
)
application_name = self._get_application_name(namespace)
try:
- await self.libjuju.deploy_charm(
+ await libjuju.deploy_charm(
model_name=model_name,
application_name=application_name,
path=full_path,
db_dict=db_dict,
progress_timeout=progress_timeout,
total_timeout=total_timeout,
- config=config
+ config=config,
)
except Exception as e:
- raise N2VCException(message='Error deploying charm: {}'.format(e))
+ raise N2VCException(message="Error deploying charm: {}".format(e))
- self.log.info('K8s proxy charm installed')
+ self.log.info("K8s proxy charm installed")
ee_id = N2VCJujuConnector._build_ee_id(
model_name=model_name,
application_name=application_name,
db_dict: dict,
progress_timeout: float = None,
total_timeout: float = None,
+ vca_id: str = None,
) -> str:
+ """
+ Get Execution environment ssh public key
+
+ :param: ee_id: the id of the execution environment returned by
+ create_execution_environment or register_execution_environment
+ :param: db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
+ :param vca_id: VCA ID
+ :returns: public key of the execution environment
+ For the case of juju proxy charm ssh-layered, it is the one
+ returned by 'get-ssh-public-key' primitive.
+ It raises a N2VC exception if fails
+ """
self.log.info(
(
"Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}"
).format(ee_id, db_dict)
)
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if ee_id is None or len(ee_id) == 0:
# execute action: generate-ssh-key
try:
- output, _status = await self.libjuju.execute_action(
+ output, _status = await libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name="generate-ssh-key",
# execute action: get-ssh-public-key
try:
- output, _status = await self.libjuju.execute_action(
+ output, _status = await libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name="get-ssh-public-key",
# return public key if exists
return output["pubkey"] if "pubkey" in output else output
- async def get_metrics(self, model_name: str, application_name: str) -> dict:
- return await self.libjuju.get_metrics(model_name, application_name)
+ async def get_metrics(
+ self, model_name: str, application_name: str, vca_id: str = None
+ ) -> dict:
+ """
+ Get metrics from application
+
+ :param: model_name: Model name
+ :param: application_name: Application name
+ :param: vca_id: VCA ID
+
+ :return: Dictionary with obtained metrics
+ """
+ libjuju = await self._get_libjuju(vca_id)
+ return await libjuju.get_metrics(model_name, application_name)
async def add_relation(
- self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
+ self,
+ ee_id_1: str,
+ ee_id_2: str,
+ endpoint_1: str,
+ endpoint_2: str,
+ vca_id: str = None,
):
+ """
+ Add relation between two charmed endpoints
+ :param: ee_id_1: The id of the first execution environment
+ :param: ee_id_2: The id of the second execution environment
+ :param: endpoint_1: The endpoint in the first execution environment
+ :param: endpoint_2: The endpoint in the second execution environment
+ :param: vca_id: VCA ID
+ """
self.log.debug(
"adding new relation between {} and {}, endpoints: {}, {}".format(
ee_id_1, ee_id_2, endpoint_1, endpoint_2
)
)
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if not ee_id_1:
# add juju relations between two applications
try:
- await self.libjuju.add_relation(
+ await libjuju.add_relation(
model_name=model_1,
endpoint_1="{}:{}".format(app_1, endpoint_1),
endpoint_2="{}:{}".format(app_2, endpoint_2),
raise MethodNotImplemented()
async def delete_namespace(
- self, namespace: str, db_dict: dict = None, total_timeout: float = None
+ self,
+ namespace: str,
+ db_dict: dict = None,
+ total_timeout: float = None,
+ vca_id: str = None,
):
+ """
+ Remove a network scenario and its execution environments
+ :param: namespace: [<nsi-id>].<ns-id>
+ :param: db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param: total_timeout: Total timeout
+ :param: vca_id: VCA ID
+ """
self.log.info("Deleting namespace={}".format(namespace))
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if namespace is None:
)
if ns_id is not None:
try:
- models = await self.libjuju.list_models(contains=ns_id)
+ models = await libjuju.list_models(contains=ns_id)
for model in models:
- await self.libjuju.destroy_model(
+ await libjuju.destroy_model(
model_name=model, total_timeout=total_timeout
)
except Exception as e:
self.log.info("Namespace {} deleted".format(namespace))
async def delete_execution_environment(
- self, ee_id: str, db_dict: dict = None, total_timeout: float = None,
- scaling_in: bool = False
+ self,
+ ee_id: str,
+ db_dict: dict = None,
+ total_timeout: float = None,
+ scaling_in: bool = False,
+ vca_id: str = None,
):
+ """
+ Delete an execution environment
+ :param str ee_id: id of the execution environment to delete
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param: total_timeout: Total timeout
+ :param: scaling_in: Boolean to indicate if is it a scaling in operation
+ :param: vca_id: VCA ID
+ """
self.log.info("Deleting execution environment ee_id={}".format(ee_id))
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if ee_id is None:
if not scaling_in:
# destroy the model
# TODO: should this be removed?
- await self.libjuju.destroy_model(
+ await libjuju.destroy_model(
model_name=model_name,
total_timeout=total_timeout,
)
else:
# destroy the application
- await self.libjuju.destroy_application(
+ await libjuju.destroy_application(
model_name=model_name,
application_name=application_name,
total_timeout=total_timeout,
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None,
+ vca_id: str = None,
) -> str:
+ """
+ Execute a primitive in the execution environment
+
+ :param: ee_id: the one returned by create_execution_environment or
+ register_execution_environment
+ :param: primitive_name: must be one defined in the software. There is one
+ called 'config', where, for the proxy case, the 'credentials' of VM are
+ provided
+ :param: params_dict: parameters of the action
+ :param: db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param: progress_timeout: Progress timeout
+ :param: total_timeout: Total timeout
+ :param: vca_id: VCA ID
+ :returns str: primitive result, if ok. It raises exceptions in case of fail
+ """
self.log.info(
"Executing primitive: {} on ee: {}, params: {}".format(
primitive_name, ee_id, params_dict
)
)
+ libjuju = await self._get_libjuju(vca_id)
# check arguments
if ee_id is None or len(ee_id) == 0:
if primitive_name == "config":
# Special case: config primitive
try:
- await self.libjuju.configure_application(
+ await libjuju.configure_application(
model_name=model_name,
application_name=application_name,
config=params_dict,
)
- actions = await self.libjuju.get_actions(
- application_name=application_name, model_name=model_name,
+ actions = await libjuju.get_actions(
+ application_name=application_name,
+ model_name=model_name,
)
self.log.debug(
"Application {} has these actions: {}".format(
for _ in range(num_retries):
try:
self.log.debug("Executing action verify-ssh-credentials...")
- output, ok = await self.libjuju.execute_action(
+ output, ok = await libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name="verify-ssh-credentials",
return "CONFIG OK"
else:
try:
- output, status = await self.libjuju.execute_action(
+ output, status = await libjuju.execute_action(
model_name=model_name,
application_name=application_name,
action_name=primitive_name,
primitive_name=primitive_name,
)
- async def disconnect(self):
+ async def disconnect(self, vca_id: str = None):
+ """
+ Disconnect from VCA
+
+ :param: vca_id: VCA ID
+ """
self.log.info("closing juju N2VC...")
+ libjuju = await self._get_libjuju(vca_id)
try:
- await self.libjuju.disconnect()
+ await libjuju.disconnect()
except Exception as e:
raise N2VCConnectionException(
- message="Error disconnecting controller: {}".format(e), url=self.url
+ message="Error disconnecting controller: {}".format(e),
+ url=libjuju.vca_connection.data.endpoints,
)
"""
####################################################################################
"""
+ async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
+ """
+ Get libjuju object
+
+ :param: vca_id: VCA ID
+ If None, get a libjuju object with a Connection to the default VCA
+ Else, geta libjuju object with a Connection to the specified VCA
+ """
+ if not vca_id:
+ while self.loading_libjuju.locked():
+ await asyncio.sleep(0.1)
+ if not self.libjuju:
+ async with self.loading_libjuju:
+ vca_connection = await get_connection(self._store)
+ self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ return self.libjuju
+ else:
+ vca_connection = await get_connection(self._store, vca_id)
+ return Libjuju(
+ vca_connection,
+ loop=self.loop,
+ log=self.log,
+ n2vc=self,
+ )
+
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
# write ee_id to database: _admin.deployed.VCA.x
return N2VCJujuConnector._format_app_name(application_name)
- def _create_juju_public_key(self):
- """Recreate the Juju public key on lcm container, if needed
- Certain libjuju commands expect to be run from the same machine as Juju
- is bootstrapped to. This method will write the public key to disk in
- that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
- """
-
- # Make sure that we have a public key before writing to disk
- if self.public_key is None or len(self.public_key) == 0:
- if "OSMLCM_VCA_PUBKEY" in os.environ:
- self.public_key = os.getenv("OSMLCM_VCA_PUBKEY", "")
- if len(self.public_key) == 0:
- return
- else:
- return
-
- pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"))
- file_path = "{}/juju_id_rsa.pub".format(pk_path)
- self.log.debug(
- "writing juju public key to file:\n{}\npublic key: {}".format(
- file_path, self.public_key
- )
- )
- if not os.path.exists(pk_path):
- # create path and write file
- os.makedirs(pk_path)
- with open(file_path, "w") as f:
- self.log.debug("Creating juju public key file: {}".format(file_path))
- f.write(self.public_key)
- else:
- self.log.debug("juju public key file already exists: {}".format(file_path))
-
@staticmethod
def _format_model_name(name: str) -> str:
"""Format the name of the model.
app_name = "z" + app_name
return app_name
+
+ async def validate_vca(self, vca_id: str):
+ """
+ Validate a VCA by connecting/disconnecting to/from it
+
+ :param: vca_id: VCA ID
+ """
+ vca_connection = await get_connection(self._store, vca_id=vca_id)
+ libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ controller = await libjuju.get_controller()
+ await libjuju.disconnect_controller(controller)
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import asyncio
+from base64 import b64decode
+import re
+import typing
+
+from Crypto.Cipher import AES
+from motor.motor_asyncio import AsyncIOMotorClient
+from n2vc.config import EnvironConfig
+from n2vc.vca.connection_data import ConnectionData
+from osm_common.dbmongo import DbMongo, DbException
+
+DB_NAME = "osm"
+
+
+class Store(abc.ABC):
+ @abc.abstractmethod
+ async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
+ """
+ Get VCA connection data
+
+ :param: vca_id: VCA ID
+
+ :returns: ConnectionData with the information of the database
+ """
+
+ @abc.abstractmethod
+ async def update_vca_endpoints(self, hosts: typing.List[str], vca_id: str):
+ """
+ Update VCA endpoints
+
+ :param: endpoints: List of endpoints to write in the database
+ :param: vca_id: VCA ID
+ """
+
+ @abc.abstractmethod
+ async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
+ """
+ Get list if VCA endpoints
+
+ :param: vca_id: VCA ID
+
+ :returns: List of endpoints
+ """
+
+ @abc.abstractmethod
+ async def get_vca_id(self, vim_id: str = None) -> str:
+ """
+ Get VCA id for a VIM account
+
+ :param: vim_id: Vim account ID
+ """
+
+
+class DbMongoStore(Store):
+ def __init__(self, db: DbMongo):
+ """
+ Constructor
+
+ :param: db: osm_common.dbmongo.DbMongo object
+ """
+ self.db = db
+
+ async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
+ """
+ Get VCA connection data
+
+ :param: vca_id: VCA ID
+
+ :returns: ConnectionData with the information of the database
+ """
+ data = self.db.get_one("vca", q_filter={"_id": vca_id})
+ self.db.encrypt_decrypt_fields(
+ data,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version=data["schema_version"],
+ salt=data["_id"],
+ )
+ return ConnectionData(**data)
+
+ async def update_vca_endpoints(
+ self, endpoints: typing.List[str], vca_id: str = None
+ ):
+ """
+ Update VCA endpoints
+
+ :param: endpoints: List of endpoints to write in the database
+ :param: vca_id: VCA ID
+ """
+ if vca_id:
+ data = self.db.get_one("vca", q_filter={"_id": vca_id})
+ data["endpoints"] = endpoints
+ self._update("vca", vca_id, data)
+ else:
+ # The default VCA. Data for the endpoints is in a different place
+ juju_info = self._get_juju_info()
+ # If it doesn't, then create it
+ if not juju_info:
+ try:
+ self.db.create(
+ "vca",
+ {"_id": "juju"},
+ )
+ except DbException as e:
+ # Racing condition: check if another N2VC worker has created it
+ juju_info = self._get_juju_info()
+ if not juju_info:
+ raise e
+ self.db.set_one(
+ "vca",
+ {"_id": "juju"},
+ {"api_endpoints": endpoints},
+ )
+
+ async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
+ """
+ Get list if VCA endpoints
+
+ :param: vca_id: VCA ID
+
+ :returns: List of endpoints
+ """
+ endpoints = []
+ if vca_id:
+ endpoints = self.get_vca_connection_data(vca_id).endpoints
+ else:
+ juju_info = self._get_juju_info()
+ if juju_info and "api_endpoints" in juju_info:
+ endpoints = juju_info["api_endpoints"]
+ return endpoints
+
+ async def get_vca_id(self, vim_id: str = None) -> str:
+ """
+ Get VCA ID from the database for a given VIM account ID
+
+ :param: vim_id: VIM account ID
+ """
+ return (
+ self.db.get_one(
+ "vim_accounts",
+ q_filter={"_id": vim_id},
+ fail_on_empty=False,
+ ).get("vca")
+ if vim_id
+ else None
+ )
+
+ def _update(self, collection: str, id: str, data: dict):
+ """
+ Update object in database
+
+ :param: collection: Collection name
+ :param: id: ID of the object
+ :param: data: Object data
+ """
+ self.db.replace(
+ collection,
+ id,
+ data,
+ )
+
+ def _get_juju_info(self):
+ """Get Juju information (the default VCA) from the admin collection"""
+ return self.db.get_one(
+ "vca",
+ q_filter={"_id": "juju"},
+ fail_on_empty=False,
+ )
+
+
+class MotorStore(Store):
+ def __init__(self, uri: str, loop=None):
+ """
+ Constructor
+
+ :param: uri: Connection string to connect to the database.
+ :param: loop: Asyncio Loop
+ """
+ self._client = AsyncIOMotorClient(uri)
+ self.loop = loop or asyncio.get_event_loop()
+ self._secret_key = None
+ self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"])
+
+ @property
+ def _database(self):
+ return self._client[DB_NAME]
+
+ @property
+ def _vca_collection(self):
+ return self._database["vca"]
+
+ @property
+ def _admin_collection(self):
+ return self._database["admin"]
+
+ @property
+ def _vim_accounts_collection(self):
+ return self._database["vim_accounts"]
+
+ async def get_vca_connection_data(self, vca_id: str) -> ConnectionData:
+ """
+ Get VCA connection data
+
+ :param: vca_id: VCA ID
+
+ :returns: ConnectionData with the information of the database
+ """
+ data = await self._vca_collection.find_one({"_id": vca_id})
+ if not data:
+ raise Exception("vca with id {} not found".format(vca_id))
+ await self.decrypt_fields(
+ data,
+ ["secret", "cacert"],
+ schema_version=data["schema_version"],
+ salt=data["_id"],
+ )
+ return ConnectionData(**data)
+
+ async def update_vca_endpoints(
+ self, endpoints: typing.List[str], vca_id: str = None
+ ):
+ """
+ Update VCA endpoints
+
+ :param: endpoints: List of endpoints to write in the database
+ :param: vca_id: VCA ID
+ """
+ if vca_id:
+ data = await self._vca_collection.find_one({"_id": vca_id})
+ data["endpoints"] = endpoints
+ await self._vca_collection.replace_one({"_id": vca_id}, data)
+ else:
+ # The default VCA. Data for the endpoints is in a different place
+ juju_info = await self._get_juju_info()
+ # If it doesn't, then create it
+ if not juju_info:
+ try:
+ await self._admin_collection.insert_one({"_id": "juju"})
+ except Exception as e:
+ # Racing condition: check if another N2VC worker has created it
+ juju_info = await self._get_juju_info()
+ if not juju_info:
+ raise e
+
+ await self._admin_collection.replace_one(
+ {"_id": "juju"}, {"api_endpoints": endpoints}
+ )
+
+ async def get_vca_endpoints(self, vca_id: str = None) -> typing.List[str]:
+ """
+ Get list if VCA endpoints
+
+ :param: vca_id: VCA ID
+
+ :returns: List of endpoints
+ """
+ endpoints = []
+ if vca_id:
+ endpoints = (await self.get_vca_connection_data(vca_id)).endpoints
+ else:
+ juju_info = await self._get_juju_info()
+ if juju_info and "api_endpoints" in juju_info:
+ endpoints = juju_info["api_endpoints"]
+ return endpoints
+
+ async def get_vca_id(self, vim_id: str = None) -> str:
+ """
+ Get VCA ID from the database for a given VIM account ID
+
+ :param: vim_id: VIM account ID
+ """
+ vca_id = None
+ if vim_id:
+ vim_account = await self._vim_accounts_collection.find_one({"_id": vim_id})
+ if vim_account and "vca" in vim_account:
+ vca_id = vim_account["vca"]
+ return vca_id
+
+ async def _get_juju_info(self):
+ """Get Juju information (the default VCA) from the admin collection"""
+ return await self._admin_collection.find_one({"_id": "juju"})
+
+ # DECRYPT METHODS
+ async def decrypt_fields(
+ self,
+ item: dict,
+ fields: typing.List[str],
+ schema_version: str = None,
+ salt: str = None,
+ ):
+ """
+ Decrypt fields
+
+ Decrypt fields from a dictionary. Follows the same logic as in osm_common.
+
+ :param: item: Dictionary with the keys to be decrypted
+ :param: fields: List of keys to decrypt
+ :param: schema version: Schema version. (i.e. 1.11)
+ :param: salt: Salt for the decryption
+ """
+ flags = re.I
+
+ async def process(_item):
+ if isinstance(_item, list):
+ for elem in _item:
+ await process(elem)
+ elif isinstance(_item, dict):
+ for key, val in _item.items():
+ if isinstance(val, str):
+ if any(re.search(f, key, flags) for f in fields):
+ _item[key] = await self.decrypt(val, schema_version, salt)
+ else:
+ await process(val)
+
+ await process(item)
+
+ async def decrypt(self, value, schema_version=None, salt=None):
+ """
+ Decrypt an encrypted value
+ :param value: value to be decrypted. It is a base64 string
+ :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
+ If '1.1' symmetric AES encryption has been done
+ :param salt: optional salt to be used
+ :return: Plain content of value
+ """
+ if not await self.secret_key or not schema_version or schema_version == "1.0":
+ return value
+ else:
+ secret_key = self._join_secret_key(salt)
+ encrypted_msg = b64decode(value)
+ cipher = AES.new(secret_key)
+ decrypted_msg = cipher.decrypt(encrypted_msg)
+ try:
+ unpadded_private_msg = decrypted_msg.decode().rstrip("\0")
+ except UnicodeDecodeError:
+ raise DbException(
+ "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
+ http_code=500,
+ )
+ return unpadded_private_msg
+
+ def _join_secret_key(self, update_key: typing.Any):
+ """
+ Join secret key
+
+ :param: update_key: str or bytes with the to update
+ """
+ if isinstance(update_key, str):
+ update_key_bytes = update_key.encode()
+ else:
+ update_key_bytes = update_key
+ new_secret_key = (
+ bytearray(self._secret_key) if self._secret_key else bytearray(32)
+ )
+ for i, b in enumerate(update_key_bytes):
+ new_secret_key[i % 32] ^= b
+ return bytes(new_secret_key)
+
+ @property
+ async def secret_key(self):
+ if self._secret_key:
+ return self._secret_key
+ else:
+ if self.database_key:
+ self._secret_key = self._join_secret_key(self.database_key)
+ version_data = await self._admin_collection.find_one({"_id": "version"})
+ if version_data and version_data.get("serial"):
+ self._secret_key = self._join_secret_key(
+ b64decode(version_data["serial"])
+ )
+ return self._secret_key
+
+ @property
+ def database_key(self):
+ return self._config["database_commonkey"]
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+from unittest.mock import patch
+
+
+from n2vc.config import EnvironConfig, ModelConfig, MODEL_CONFIG_KEYS
+
+
+def generate_os_environ_dict(config, prefix):
+ return {f"{prefix}{k.upper()}": v for k, v in config.items()}
+
+
+class TestEnvironConfig(TestCase):
+ def setUp(self):
+ self.config = {"host": "1.2.3.4", "port": "17070", "k8s_cloud": "k8s"}
+
+ @patch("os.environ.items")
+ def test_environ_config_lcm(self, mock_environ_items):
+ envs = generate_os_environ_dict(self.config, "OSMLCM_VCA_")
+ envs["not_valid_env"] = "something"
+ mock_environ_items.return_value = envs.items()
+ config = EnvironConfig()
+ self.assertEqual(config, self.config)
+
+ @patch("os.environ.items")
+ def test_environ_config_mon(self, mock_environ_items):
+ envs = generate_os_environ_dict(self.config, "OSMMON_VCA_")
+ envs["not_valid_env"] = "something"
+ mock_environ_items.return_value = envs.items()
+ config = EnvironConfig()
+ self.assertEqual(config, self.config)
+
+
+class TestModelConfig(TestCase):
+ def setUp(self):
+ self.config = {
+ f'model_config_{model_key.replace("-", "_")}': "somevalue"
+ for model_key in MODEL_CONFIG_KEYS
+ }
+ self.config["model_config_invalid"] = "something"
+ self.model_config = {model_key: "somevalue" for model_key in MODEL_CONFIG_KEYS}
+
+ def test_model_config(self):
+ model_config = ModelConfig(self.config)
+ self.assertEqual(model_config, self.model_config)
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+from unittest import TestCase
+from unittest.mock import Mock, patch
+
+
+from n2vc.tests.unit.utils import AsyncMock
+from n2vc.vca import connection
+
+
+class TestConnection(TestCase):
+ def setUp(self):
+ self.loop = asyncio.get_event_loop()
+ self.store = AsyncMock()
+
+ def test_load_from_store(self):
+ self.loop.run_until_complete(connection.get_connection(self.store, "vim_id"))
+
+ self.store.get_vca_connection_data.assert_called_once()
+
+ def test_cloud_properties(self):
+ conn = self.loop.run_until_complete(
+ connection.get_connection(self.store, "vim_id")
+ )
+ conn._data = Mock()
+ conn._data.lxd_cloud = "name"
+ conn._data.k8s_cloud = "name"
+ conn._data.lxd_credentials = "credential"
+ conn._data.k8s_credentials = "credential"
+
+ self.assertEqual(conn.lxd_cloud.name, "name")
+ self.assertEqual(conn.lxd_cloud.credential_name, "credential")
+ self.assertEqual(conn.k8s_cloud.name, "name")
+ self.assertEqual(conn.k8s_cloud.credential_name, "credential")
+
+ @patch("n2vc.vca.connection.EnvironConfig")
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_load_from_env(self, mock_base64_to_cacert, mock_env):
+ mock_base64_to_cacert.return_value = "cacert"
+ mock_env.return_value = {
+ "endpoints": "1.2.3.4:17070",
+ "user": "user",
+ "secret": "secret",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "cloud": "cloud",
+ "credentials": "credentials",
+ "k8s_cloud": "k8s_cloud",
+ "k8s_credentials": "k8s_credentials",
+ "model_config": {},
+ "api-proxy": "api_proxy",
+ }
+ self.store.get_vca_endpoints.return_value = ["1.2.3.5:17070"]
+ self.loop.run_until_complete(connection.get_connection(self.store))
+ self.store.get_vca_connection_data.assert_not_called()
def test_model_watcher(self, allwatcher):
tests = Deltas
allwatcher.return_value = FakeWatcher()
+ n2vc = AsyncMock()
for test in tests:
with self.assertRaises(asyncio.TimeoutError):
allwatcher.return_value.delta_to_return = [test.delta]
test.filter.entity_type,
timeout=0,
db_dict={"something"},
- n2vc=self.n2vc,
+ n2vc=n2vc,
+ vca_id=None,
)
)
- self.assertEqual(self.n2vc.last_written_values, test.db.data)
- self.n2vc.last_written_values = None
+ n2vc.write_app_status_to_db.assert_called()
@mock.patch("n2vc.juju_watcher.asyncio.wait")
def test_wait_for(self, wait):
from n2vc.exceptions import (
MethodNotImplemented,
K8sException,
- N2VCBadArgumentsException,
)
+from n2vc.vca.connection_data import ConnectionData
class K8sJujuConnTestCase(asynctest.TestCase):
- @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
- @asynctest.mock.patch("juju.client.connector.Connector.connect")
- @asynctest.mock.patch("juju.controller.Controller.connection")
- @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
@asynctest.mock.patch("n2vc.k8s_juju_conn.Libjuju")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.MotorStore")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.get_connection")
+ @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert")
def setUp(
self,
- mock_libjuju=None,
mock_base64_to_cacert=None,
- mock_connection=None,
- mock_connect=None,
- mock_update_endpoints=None,
+ mock_get_connection=None,
+ mock_store=None,
+ mock_libjuju=None,
):
self.loop = asyncio.get_event_loop()
- mock_libjuju.return_value = AsyncMock()
- db = Mock()
- vca_config = {
- "secret": "secret",
- "api_proxy": "api_proxy",
- "cloud": "cloud",
- "k8s_cloud": "k8s_cloud",
- "user": "user",
- "host": "1.1.1.1",
- "port": 17070,
- "ca_cert": "cacert",
- }
-
+ self.db = Mock()
+ mock_base64_to_cacert.return_value = """
+ -----BEGIN CERTIFICATE-----
+ SOMECERT
+ -----END CERTIFICATE-----"""
+ mock_libjuju.return_value = Mock()
+ mock_store.return_value = AsyncMock()
+ mock_vca_connection = Mock()
+ mock_get_connection.return_value = mock_vca_connection
+ mock_vca_connection.data.return_value = ConnectionData(
+ **{
+ "endpoints": ["1.2.3.4:17070"],
+ "user": "user",
+ "secret": "secret",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "lxd-cloud": "cloud",
+ "lxd-credentials": "credentials",
+ "k8s-cloud": "k8s_cloud",
+ "k8s-credentials": "k8s_credentials",
+ "model-config": {},
+ "api-proxy": "api_proxy",
+ }
+ )
logging.disable(logging.CRITICAL)
self.k8s_juju_conn = K8sJujuConnector(
fs=fslocal.FsLocal(),
- db=db,
+ db=self.db,
log=None,
loop=self.loop,
- vca_config=vca_config,
- on_update_db=None,
- )
-
-
-class K8sJujuConnInitSuccessTestCase(asynctest.TestCase):
- def setUp(
- self,
- ):
- logging.disable(logging.CRITICAL)
-
- @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
- @asynctest.mock.patch("juju.client.connector.Connector.connect")
- @asynctest.mock.patch("juju.controller.Controller.connection")
- @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
- @asynctest.mock.patch("n2vc.libjuju.Libjuju.__init__")
- def test_success(
- self,
- mock_libjuju=None,
- mock_base64_to_cacert=None,
- mock_connection=None,
- mock_connect=None,
- mock_update_endpoints=None,
- ):
- mock_libjuju.return_value = None
- loop = asyncio.get_event_loop()
- log = logging.getLogger()
- db = Mock()
- vca_config = {
- "secret": "secret",
- "cloud": "cloud",
- "k8s_cloud": "k8s_cloud",
- "user": "user",
- "host": "1.1.1.1",
- "port": 17070,
- "ca_cert": "cacert",
- }
- K8sJujuConnector(
- fs=fslocal.FsLocal(),
- db=db,
- log=log,
- loop=self.loop,
- vca_config=vca_config,
on_update_db=None,
)
-
- mock_libjuju.assert_called_once_with(
- endpoint="1.1.1.1:17070",
- api_proxy=None, # Not needed for k8s charms
- model_config={},
- username="user",
- password="secret",
- cacert=mock_base64_to_cacert.return_value,
- loop=loop,
- log=log,
- db=db,
- )
-
-
-class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase):
- def setUp(
- self,
- ):
- self.loop = asyncio.get_event_loop()
- logging.disable(logging.CRITICAL)
- self.vca_config = {
- "secret": "secret",
- "api_proxy": "api_proxy",
- "cloud": "cloud",
- "k8s_cloud": "k8s_cloud",
- "user": "user",
- "host": "1.1.1.1",
- "port": 17070,
- "ca_cert": "cacert",
- }
-
- def test_missing_vca_config_host(self):
- db = Mock()
- self.vca_config.pop("host")
- with self.assertRaises(N2VCBadArgumentsException):
- self.k8s_juju_conn = K8sJujuConnector(
- fs=fslocal.FsLocal(),
- db=db,
- log=None,
- loop=self.loop,
- vca_config=self.vca_config,
- on_update_db=None,
- )
-
- def test_missing_vca_config_user(self):
- db = Mock()
- self.vca_config.pop("user")
- with self.assertRaises(N2VCBadArgumentsException):
- self.k8s_juju_conn = K8sJujuConnector(
- fs=fslocal.FsLocal(),
- db=db,
- log=None,
- loop=self.loop,
- vca_config=self.vca_config,
- on_update_db=None,
- )
-
- def test_missing_vca_config_secret(self):
- db = Mock()
- self.vca_config.pop("secret")
- with self.assertRaises(N2VCBadArgumentsException):
- self.k8s_juju_conn = K8sJujuConnector(
- fs=fslocal.FsLocal(),
- db=db,
- log=None,
- loop=self.loop,
- vca_config=self.vca_config,
- on_update_db=None,
- )
-
- def test_missing_vca_config_ca_cert(self):
- db = Mock()
- self.vca_config.pop("ca_cert")
- with self.assertRaises(N2VCBadArgumentsException):
- self.k8s_juju_conn = K8sJujuConnector(
- fs=fslocal.FsLocal(),
- db=db,
- log=None,
- loop=self.loop,
- vca_config=self.vca_config,
- on_update_db=None,
- )
+ self.k8s_juju_conn._store.get_vca_id.return_value = None
+ self.k8s_juju_conn.libjuju = Mock()
@asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
)
)
self.assertEqual(mock_chdir.call_count, 2)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
"local:{}".format(self.local_bundle),
model_name=self.kdu_instance,
timeout=1800,
)
)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
self.cs_bundle,
model_name=self.kdu_instance,
timeout=1800,
)
)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
self.http_bundle,
model_name=self.kdu_instance,
timeout=1800,
)
)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
self.cs_bundle,
model_name=self.kdu_instance,
timeout=1800,
)
)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
self.cs_bundle,
model_name=self.kdu_instance,
timeout=1800,
)
)
- self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
- model_name=self.kdu_instance,
- cloud_name=self.cluster_uuid,
- credential_name="cred-{}".format(self.cluster_uuid),
- )
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once()
self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
"local:{}".format(self.local_bundle),
model_name=self.kdu_instance,
import asyncio
import asynctest
import tempfile
-from unittest import mock
+from unittest.mock import Mock, patch
import juju
import kubernetes
from juju.errors import JujuAPIError
import logging
-from .utils import FakeN2VC, FakeMachine, FakeApplication
+from .utils import FakeMachine, FakeApplication
from n2vc.libjuju import Libjuju
from n2vc.exceptions import (
JujuControllerFailedConnecting,
JujuError,
)
from n2vc.k8s_juju_conn import generate_rbac_id
+from n2vc.tests.unit.utils import AsyncMock
+from n2vc.vca.connection import Connection
+from n2vc.vca.connection_data import ConnectionData
+cacert = """-----BEGIN CERTIFICATE-----
+SOMECERT
+-----END CERTIFICATE-----"""
+
+
+@asynctest.mock.patch("n2vc.libjuju.Controller")
class LibjujuTestCase(asynctest.TestCase):
- @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
- @asynctest.mock.patch("juju.client.connector.Connector.connect")
- @asynctest.mock.patch("juju.controller.Controller.connection")
- @asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db")
+ @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert")
def setUp(
self,
- mock__get_api_endpoints_db=None,
- mock_connection=None,
- mock_connect=None,
- mock_update_endpoints=None,
- ):
- loop = asyncio.get_event_loop()
- n2vc = FakeN2VC()
- mock__get_api_endpoints_db.return_value = ["127.0.0.1:17070"]
- endpoints = "127.0.0.1:17070"
- username = "admin"
- password = "secret"
- cacert = """
- -----BEGIN CERTIFICATE-----
- SOMECERT
- -----END CERTIFICATE-----"""
- self.libjuju = Libjuju(
- endpoints,
- "192.168.0.155:17070",
- username,
- password,
- cacert,
- loop,
- log=None,
- db={"get_one": []},
- n2vc=n2vc,
- )
- logging.disable(logging.CRITICAL)
- loop.run_until_complete(self.libjuju.disconnect())
-
-
-@asynctest.mock.patch("n2vc.libjuju.Libjuju._create_health_check_task")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju._update_api_endpoints_db")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db")
-class LibjujuInitTestCase(asynctest.TestCase):
- def setUp(self):
+ mock_base64_to_cacert=None,
+ ):
self.loop = asyncio.get_event_loop()
- self.n2vc = FakeN2VC()
- self.endpoint = "192.168.100.100:17070"
- self.username = "admin"
- self.password = "secret"
- self.cacert = """
- -----BEGIN CERTIFICATE-----
- SOMECERT
- -----END CERTIFICATE-----"""
-
- def test_endpoint_not_in_db(
- self,
- mock__get_api_endpoints_db,
- mock_update_endpoints,
- mock_create_health_check_task,
- ):
- mock__get_api_endpoints_db.return_value = ["another_ip"]
- Libjuju(
- self.endpoint,
- "192.168.0.155:17070",
- self.username,
- self.password,
- self.cacert,
- self.loop,
- log=None,
- db={"get_one": []},
- n2vc=self.n2vc,
+ self.db = Mock()
+ mock_base64_to_cacert.return_value = cacert
+ Connection._load_vca_connection_data = Mock()
+ vca_connection = Connection(AsyncMock())
+ vca_connection._data = ConnectionData(
+ **{
+ "endpoints": ["1.2.3.4:17070"],
+ "user": "user",
+ "secret": "secret",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "lxd-cloud": "cloud",
+ "lxd-credentials": "credentials",
+ "k8s-cloud": "k8s_cloud",
+ "k8s-credentials": "k8s_credentials",
+ "model-config": {},
+ "api-proxy": "api_proxy",
+ }
)
- mock_update_endpoints.assert_called_once_with([self.endpoint])
- mock__get_api_endpoints_db.assert_called_once()
-
- def test_endpoint_in_db(
- self,
- mock__get_api_endpoints_db,
- mock_update_endpoints,
- mock_create_health_check_task,
- ):
- mock__get_api_endpoints_db.return_value = [self.endpoint, "another_ip"]
- Libjuju(
- self.endpoint,
- "192.168.0.155:17070",
- self.username,
- self.password,
- self.cacert,
- self.loop,
- log=None,
- db={"get_one": []},
- n2vc=self.n2vc,
- )
- mock_update_endpoints.assert_not_called()
- mock__get_api_endpoints_db.assert_called_once()
-
- def test_no_db_endpoints(
- self,
- mock__get_api_endpoints_db,
- mock_update_endpoints,
- mock_create_health_check_task,
- ):
- mock__get_api_endpoints_db.return_value = None
- Libjuju(
- self.endpoint,
- "192.168.0.155:17070",
- self.username,
- self.password,
- self.cacert,
- self.loop,
- log=None,
- db={"get_one": []},
- n2vc=self.n2vc,
- )
- mock_update_endpoints.assert_called_once_with([self.endpoint])
- mock__get_api_endpoints_db.assert_called_once()
+ logging.disable(logging.CRITICAL)
+ self.libjuju = Libjuju(vca_connection, self.loop)
+ self.loop.run_until_complete(self.libjuju.disconnect())
@asynctest.mock.patch("juju.controller.Controller.connect")
"juju.controller.Controller.api_endpoints",
new_callable=asynctest.CoroutineMock(return_value=["127.0.0.1:17070"]),
)
-@asynctest.mock.patch("n2vc.libjuju.Libjuju._update_api_endpoints_db")
class GetControllerTest(LibjujuTestCase):
def setUp(self):
super(GetControllerTest, self).setUp()
- def test_diff_endpoint(
- self, mock__update_api_endpoints_db, mock_api_endpoints, mock_connect
- ):
+ def test_diff_endpoint(self, mock_api_endpoints, mock_connect):
self.libjuju.endpoints = []
controller = self.loop.run_until_complete(self.libjuju.get_controller())
- mock__update_api_endpoints_db.assert_called_once_with(["127.0.0.1:17070"])
self.assertIsInstance(controller, juju.controller.Controller)
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
def test_exception(
self,
mock_disconnect_controller,
- mock__update_api_endpoints_db,
mock_api_endpoints,
mock_connect,
):
self.libjuju.endpoints = []
- mock__update_api_endpoints_db.side_effect = Exception()
+
+ mock_connect.side_effect = Exception()
controller = None
with self.assertRaises(JujuControllerFailedConnecting):
controller = self.loop.run_until_complete(self.libjuju.get_controller())
self.assertIsNone(controller)
mock_disconnect_controller.assert_called_once()
- def test_same_endpoint_get_controller(
- self, mock__update_api_endpoints_db, mock_api_endpoints, mock_connect
- ):
+ def test_same_endpoint_get_controller(self, mock_api_endpoints, mock_connect):
self.libjuju.endpoints = ["127.0.0.1:17070"]
controller = self.loop.run_until_complete(self.libjuju.get_controller())
- mock__update_api_endpoints_db.assert_not_called()
self.assertIsInstance(controller, juju.controller.Controller)
mock_model_exists.return_value = True
# This should not raise an exception
- self.loop.run_until_complete(
- self.libjuju.add_model("existing_model", "cloud")
- )
+ self.loop.run_until_complete(self.libjuju.add_model("existing_model", "cloud"))
mock_disconnect_controller.assert_called()
mock_get_controller.return_value = juju.controller.Controller()
self.loop.run_until_complete(
- self.libjuju.add_model("nonexisting_model", "cloud")
+ self.libjuju.add_model("nonexisting_model", Mock())
)
mock_add_model.assert_called_once()
mock_configuration.key_file = None
self.token = None
self.cert_data = None
- with mock.patch.object(self.libjuju.log, "debug") as mock_debug:
+ with patch.object(self.libjuju.log, "debug") as mock_debug:
credential = self.libjuju.get_k8s_cloud_credential(
mock_configuration,
self.cert_data,
from n2vc.n2vc_juju_conn import N2VCJujuConnector
from osm_common import fslocal
from n2vc.exceptions import (
- JujuK8sProxycharmNotSupported,
N2VCBadArgumentsException,
N2VCException,
)
+from n2vc.tests.unit.utils import AsyncMock
+from n2vc.vca.connection_data import ConnectionData
class N2VCJujuConnTestCase(asynctest.TestCase):
- @asynctest.mock.patch("n2vc.libjuju.Libjuju._create_health_check_task")
- @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
- @asynctest.mock.patch("juju.client.connector.Connector.connect")
- @asynctest.mock.patch("juju.controller.Controller.connection")
- @asynctest.mock.patch("n2vc.libjuju.Libjuju._get_api_endpoints_db")
+ @asynctest.mock.patch("n2vc.n2vc_juju_conn.MotorStore")
+ @asynctest.mock.patch("n2vc.n2vc_juju_conn.get_connection")
+ @asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert")
def setUp(
self,
- mock__get_api_endpoints_db=None,
- mock_connection=None,
- mock_connect=None,
- mock_update_endpoints=None,
- mock__create_health_check_task=None,
+ mock_base64_to_cacert=None,
+ mock_get_connection=None,
+ mock_store=None,
):
- mock__get_api_endpoints_db.return_value = ["2.2.2.2:17070"]
- loop = asyncio.get_event_loop()
- db = {}
- vca_config = {
- "secret": "secret",
- "api_proxy": "api_proxy",
- "cloud": "cloud",
- "k8s_cloud": "k8s_cloud",
- }
-
+ self.loop = asyncio.get_event_loop()
+ self.db = Mock()
+ mock_base64_to_cacert.return_value = """
+ -----BEGIN CERTIFICATE-----
+ SOMECERT
+ -----END CERTIFICATE-----"""
+ mock_store.return_value = AsyncMock()
+ mock_vca_connection = Mock()
+ mock_get_connection.return_value = mock_vca_connection
+ mock_vca_connection.data.return_value = ConnectionData(
+ **{
+ "endpoints": ["1.2.3.4:17070"],
+ "user": "user",
+ "secret": "secret",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "lxd-cloud": "cloud",
+ "lxd-credentials": "credentials",
+ "k8s-cloud": "k8s_cloud",
+ "k8s-credentials": "k8s_credentials",
+ "model-config": {},
+ "api-proxy": "api_proxy",
+ }
+ )
logging.disable(logging.CRITICAL)
N2VCJujuConnector.get_public_key = Mock()
self.n2vc = N2VCJujuConnector(
- db=db,
+ db=self.db,
fs=fslocal.FsLocal(),
log=None,
- loop=loop,
- url="2.2.2.2:17070",
- username="admin",
- vca_config=vca_config,
+ loop=self.loop,
on_update_db=None,
)
N2VCJujuConnector.get_public_key.assert_not_called()
+ self.n2vc.libjuju = Mock()
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_metrics")
class GetMetricssTest(N2VCJujuConnTestCase):
def setUp(self):
super(GetMetricssTest, self).setUp()
+ self.n2vc.libjuju.get_metrics = AsyncMock()
- def test_success(self, mock_get_metrics):
+ def test_success(self):
_ = self.loop.run_until_complete(self.n2vc.get_metrics("model", "application"))
- mock_get_metrics.assert_called_once()
+ self.n2vc.libjuju.get_metrics.assert_called_once()
- def test_except(self, mock_get_metrics):
- mock_get_metrics.side_effect = Exception()
+ def test_except(self):
+ self.n2vc.libjuju.get_metrics.side_effect = Exception()
with self.assertRaises(Exception):
_ = self.loop.run_until_complete(
self.n2vc.get_metrics("model", "application")
)
- mock_get_metrics.assert_called_once()
+ self.n2vc.libjuju.get_metrics.assert_called_once()
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_executed_actions")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_actions")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_application_configs")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju._get_application")
class UpdateVcaStatusTest(N2VCJujuConnTestCase):
def setUp(self):
super(UpdateVcaStatusTest, self).setUp()
+ self.n2vc.libjuju.get_controller = AsyncMock()
+ self.n2vc.libjuju.get_model = AsyncMock()
+ self.n2vc.libjuju.get_executed_actions = AsyncMock()
+ self.n2vc.libjuju.get_actions = AsyncMock()
+ self.n2vc.libjuju.get_application_configs = AsyncMock()
+ self.n2vc.libjuju._get_application = AsyncMock()
def test_success(
self,
- mock_get_application,
- mock_get_application_configs,
- mock_get_actions,
- mock_get_executed_actions,
- mock_get_model,
- mock_get_controller,
):
- self.loop.run_until_complete(self.n2vc.update_vca_status(
- {"model": {"applications": {"app": {"actions": {}}}}}))
- mock_get_executed_actions.assert_called_once()
- mock_get_actions.assert_called_once()
- mock_get_application_configs.assert_called_once()
+ self.loop.run_until_complete(
+ self.n2vc.update_vca_status(
+ {"model": {"applications": {"app": {"actions": {}}}}}
+ )
+ )
+ self.n2vc.libjuju.get_executed_actions.assert_called_once()
+ self.n2vc.libjuju.get_actions.assert_called_once()
+ self.n2vc.libjuju.get_application_configs.assert_called_once()
- def test_exception(
- self,
- mock_get_application,
- mock_get_application_configs,
- mock_get_actions,
- mock_get_executed_actions,
- mock_get_model,
- mock_get_controller,
- ):
- mock_get_model.return_value = None
- mock_get_executed_actions.side_effect = Exception()
+ def test_exception(self):
+ self.n2vc.libjuju.get_model.return_value = None
+ self.n2vc.libjuju.get_executed_actions.side_effect = Exception()
with self.assertRaises(Exception):
- self.loop.run_until_complete(self.n2vc.update_vca_status(
- {"model": {"applications": {"app": {"actions": {}}}}}))
- mock_get_executed_actions.assert_not_called()
- mock_get_actions.assert_not_called_once()
- mock_get_application_configs.assert_not_called_once()
+ self.loop.run_until_complete(
+ self.n2vc.update_vca_status(
+ {"model": {"applications": {"app": {"actions": {}}}}}
+ )
+ )
+ self.n2vc.libjuju.get_executed_actions.assert_not_called()
+ self.n2vc.libjuju.get_actions.assert_not_called_once()
+ self.n2vc.libjuju.get_application_configs.assert_not_called_once()
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.model_exists")
@asynctest.mock.patch("osm_common.fslocal.FsLocal.file_exists")
@asynctest.mock.patch(
"osm_common.fslocal.FsLocal.path", new_callable=asynctest.PropertyMock, create=True
)
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.deploy_charm")
-@asynctest.mock.patch("n2vc.libjuju.Libjuju.add_model")
class K8sProxyCharmsTest(N2VCJujuConnTestCase):
def setUp(self):
super(K8sProxyCharmsTest, self).setUp()
+ self.n2vc.libjuju.model_exists = AsyncMock()
+ self.n2vc.libjuju.add_model = AsyncMock()
+ self.n2vc.libjuju.deploy_charm = AsyncMock()
+ self.n2vc.libjuju.model_exists.return_value = False
def test_success(
- self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists
+ self,
+ mock_path,
+ mock_file_exists,
):
- mock_model_exists.return_value = None
mock_file_exists.return_value = True
mock_path.return_value = "/path"
ee_id = self.loop.run_until_complete(
self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "////path/", {},
+ "charm",
+ "nsi-id.ns-id.vnf-id.vdu",
+ "////path/",
+ {},
)
)
- mock_add_model.assert_called_once_with(
- "ns-id-k8s",
- cloud_name=self.n2vc.k8s_cloud,
- credential_name=self.n2vc.k8s_cloud
- )
- mock_deploy_charm.assert_called_once_with(
+ self.n2vc.libjuju.add_model.assert_called_once()
+ self.n2vc.libjuju.deploy_charm.assert_called_once_with(
model_name="ns-id-k8s",
application_name="app-vnf-vnf-id-vdu-vdu",
path="/path/path/",
)
self.assertEqual(ee_id, "ns-id-k8s.app-vnf-vnf-id-vdu-vdu.k8s")
- @asynctest.mock.patch(
- "n2vc.n2vc_juju_conn.N2VCJujuConnector.k8s_cloud",
- new_callable=asynctest.PropertyMock,
- create=True,
- )
- def test_no_k8s_cloud(
+ def test_no_artifact_path(
self,
- mock_k8s_cloud,
- mock_add_model,
- mock_deploy_charm,
mock_path,
mock_file_exists,
- mock_model_exists,
- ):
- mock_k8s_cloud.return_value = None
- with self.assertRaises(JujuK8sProxycharmNotSupported):
- ee_id = self.loop.run_until_complete(
- self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", {},
- )
- )
- self.assertIsNone(ee_id)
-
- def test_no_artifact_path(
- self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists,
):
with self.assertRaises(N2VCBadArgumentsException):
ee_id = self.loop.run_until_complete(
self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "", {},
+ "charm",
+ "nsi-id.ns-id.vnf-id.vdu",
+ "",
+ {},
)
)
self.assertIsNone(ee_id)
def test_no_db(
- self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists,
+ self,
+ mock_path,
+ mock_file_exists,
):
with self.assertRaises(N2VCBadArgumentsException):
ee_id = self.loop.run_until_complete(
self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", None,
+ "charm",
+ "nsi-id.ns-id.vnf-id.vdu",
+ "/path/",
+ None,
)
)
self.assertIsNone(ee_id)
def test_file_not_exists(
- self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists,
+ self,
+ mock_path,
+ mock_file_exists,
):
mock_file_exists.return_value = False
with self.assertRaises(N2VCBadArgumentsException):
ee_id = self.loop.run_until_complete(
self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "/path/", {},
+ "charm",
+ "nsi-id.ns-id.vnf-id.vdu",
+ "/path/",
+ {},
)
)
self.assertIsNone(ee_id)
def test_exception(
- self, mock_add_model, mock_deploy_charm, mock_path, mock_file_exists, mock_model_exists,
+ self,
+ mock_path,
+ mock_file_exists,
):
- mock_model_exists.return_value = None
mock_file_exists.return_value = True
mock_path.return_value = "/path"
- mock_deploy_charm.side_effect = Exception()
+ self.n2vc.libjuju.deploy_charm.side_effect = Exception()
with self.assertRaises(N2VCException):
ee_id = self.loop.run_until_complete(
self.n2vc.install_k8s_proxy_charm(
- "charm", "nsi-id.ns-id.vnf-id.vdu", "path/", {},
+ "charm",
+ "nsi-id.ns-id.vnf-id.vdu",
+ "path/",
+ {},
)
)
self.assertIsNone(ee_id)
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+from base64 import b64decode
+from unittest import TestCase
+from unittest.mock import Mock, patch
+
+
+from n2vc.store import DbMongoStore, MotorStore
+from n2vc.vca.connection_data import ConnectionData
+from n2vc.tests.unit.utils import AsyncMock
+from osm_common.dbmongo import DbException
+
+
+class TestDbMongoStore(TestCase):
+ def setUp(self):
+ self.store = DbMongoStore(Mock())
+ self.loop = asyncio.get_event_loop()
+
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_get_vca_connection_data(self, mock_base64_to_cacert):
+ mock_base64_to_cacert.return_value = "cacert"
+ conn_data = {
+ "endpoints": ["1.2.3.4:17070"],
+ "user": "admin",
+ "secret": "1234",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "lxd-cloud": "lxd-cloud",
+ "lxd-credentials": "lxd-credentials",
+ "k8s-cloud": "k8s-cloud",
+ "k8s-credentials": "k8s-credentials",
+ "model-config": {},
+ "api-proxy": None,
+ }
+ db_get_one = conn_data.copy()
+ db_get_one.update({"schema_version": "1.1", "_id": "id"})
+ self.store.db.get_one.return_value = db_get_one
+ connection_data = self.loop.run_until_complete(
+ self.store.get_vca_connection_data("vca_id")
+ )
+ self.assertTrue(
+ all(
+ connection_data.__dict__[k.replace("-", "_")] == v
+ for k, v in conn_data.items()
+ )
+ )
+
+ def test_update_vca_endpoints(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.store.db.get_one.side_effect = [None, {"api_endpoints": []}]
+ self.store.db.create.side_effect = DbException("already exists")
+ self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints))
+ self.assertEqual(self.store.db.get_one.call_count, 2)
+ Mock()
+ self.store.db.set_one.assert_called_once_with(
+ "vca", {"_id": "juju"}, {"api_endpoints": endpoints}
+ )
+
+ def test_update_vca_endpoints_exception(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.store.db.get_one.side_effect = [None, None]
+ self.store.db.create.side_effect = DbException("already exists")
+ with self.assertRaises(DbException):
+ self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints))
+ self.assertEqual(self.store.db.get_one.call_count, 2)
+ self.store.db.set_one.assert_not_called()
+
+ def test_update_vca_endpoints_with_vca_id(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.store.db.get_one.return_value = {}
+ self.loop.run_until_complete(
+ self.store.update_vca_endpoints(endpoints, "vca_id")
+ )
+ self.store.db.get_one.assert_called_once_with("vca", q_filter={"_id": "vca_id"})
+ self.store.db.replace.assert_called_once_with(
+ "vca", "vca_id", {"endpoints": endpoints}
+ )
+
+ def test_get_vca_endpoints(self):
+ endpoints = ["1.2.3.4:17070"]
+ db_data = {"api_endpoints": endpoints}
+ db_returns = [db_data, None]
+ expected_returns = [endpoints, []]
+ returns = []
+ self.store._get_juju_info = Mock()
+ self.store._get_juju_info.side_effect = db_returns
+ for _ in range(len(db_returns)):
+ e = self.loop.run_until_complete(self.store.get_vca_endpoints())
+ returns.append(e)
+ self.assertEqual(expected_returns, returns)
+
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_get_vca_endpoints_with_vca_id(self, mock_base64_to_cacert):
+ expected_endpoints = ["1.2.3.4:17070"]
+ mock_base64_to_cacert.return_value = "cacert"
+ self.store.get_vca_connection_data = Mock()
+ self.store.get_vca_connection_data.return_value = ConnectionData(
+ **{
+ "endpoints": expected_endpoints,
+ "user": "admin",
+ "secret": "1234",
+ "cacert": "cacert",
+ }
+ )
+ endpoints = self.loop.run_until_complete(self.store.get_vca_endpoints("vca_id"))
+ self.store.get_vca_connection_data.assert_called_with("vca_id")
+ self.assertEqual(expected_endpoints, endpoints)
+
+ def test_get_vca_id(self):
+ self.assertIsNone(self.loop.run_until_complete(self.store.get_vca_id()))
+
+ def test_get_vca_id_with_vim_id(self):
+ self.store.db.get_one.return_value = {"vca": "vca_id"}
+ vca_id = self.loop.run_until_complete(self.store.get_vca_id("vim_id"))
+ self.store.db.get_one.assert_called_once_with(
+ "vim_accounts", q_filter={"_id": "vim_id"}, fail_on_empty=False
+ )
+ self.assertEqual(vca_id, "vca_id")
+
+
+class TestMotorStore(TestCase):
+ def setUp(self):
+ self.store = MotorStore("uri")
+ self.vca_collection = Mock()
+ self.vca_collection.find_one = AsyncMock()
+ self.vca_collection.insert_one = AsyncMock()
+ self.vca_collection.replace_one = AsyncMock()
+ self.admin_collection = Mock()
+ self.admin_collection.find_one = AsyncMock()
+ self.admin_collection.insert_one = AsyncMock()
+ self.admin_collection.replace_one = AsyncMock()
+ self.vim_accounts_collection = Mock()
+ self.vim_accounts_collection.find_one = AsyncMock()
+ self.store._client = {
+ "osm": {
+ "vca": self.vca_collection,
+ "admin": self.admin_collection,
+ "vim_accounts": self.vim_accounts_collection,
+ }
+ }
+ self.store._config = {"database_commonkey": "osm"}
+ # self.store.decrypt_fields = Mock()
+ self.loop = asyncio.get_event_loop()
+
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_get_vca_connection_data(self, mock_base64_to_cacert):
+ mock_base64_to_cacert.return_value = "cacert"
+ conn_data = {
+ "endpoints": ["1.2.3.4:17070"],
+ "user": "admin",
+ "secret": "1234",
+ "cacert": "cacert",
+ "pubkey": "pubkey",
+ "lxd-cloud": "lxd-cloud",
+ "lxd-credentials": "lxd-credentials",
+ "k8s-cloud": "k8s-cloud",
+ "k8s-credentials": "k8s-credentials",
+ "model-config": {},
+ "api-proxy": None,
+ }
+ db_find_one = conn_data.copy()
+ db_find_one.update({"schema_version": "1.1", "_id": "id"})
+ self.vca_collection.find_one.return_value = db_find_one
+ self.store.decrypt_fields = AsyncMock()
+ connection_data = self.loop.run_until_complete(
+ self.store.get_vca_connection_data("vca_id")
+ )
+ self.assertTrue(
+ all(
+ connection_data.__dict__[k.replace("-", "_")] == v
+ for k, v in conn_data.items()
+ )
+ )
+
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_get_vca_connection_data_exception(self, mock_base64_to_cacert):
+ mock_base64_to_cacert.return_value = "cacert"
+ self.vca_collection.find_one.return_value = None
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.store.get_vca_connection_data("vca_id"))
+
+ def test_update_vca_endpoints(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.admin_collection.find_one.side_effect = [None, {"api_endpoints": []}]
+ self.admin_collection.insert_one.side_effect = DbException("already exists")
+ self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints))
+ self.assertEqual(self.admin_collection.find_one.call_count, 2)
+ self.admin_collection.replace_one.assert_called_once_with(
+ {"_id": "juju"}, {"api_endpoints": ["1.2.3.4:17070"]}
+ )
+
+ def test_get_vca_connection_data_with_id(self):
+ secret = "e7b253af37785045d1ca08b8d929e556"
+ encrypted_secret = "kI46kRJh828ExSNpr16OG/q5a5/qTsE0bsHrv/W/2/g="
+ cacert = "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ4ekNDQWx1Z0F3SUJBZ0lVRWlzTTBoQWxiYzQ0Z1ZhZWh6bS80ZUsyNnRZd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0lURU5NQXNHQTFVRUNoTUVTblZxZFRFUU1BNEdBMVVFQXhNSGFuVnFkUzFqWVRBZUZ3MHlNVEEwTWpNeApNRFV3TXpSYUZ3MHpNVEEwTWpNeE1EVTFNelJhTUNFeERUQUxCZ05WQkFvVEJFcDFhblV4RURBT0JnTlZCQU1UCkIycDFhblV0WTJFd2dnR2lNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJqd0F3Z2dHS0FvSUJnUUNhTmFvNGZab2gKTDJWYThtdy9LdCs3RG9tMHBYTlIvbEUxSHJyVmZvbmZqZFVQV01zSHpTSjJZZXlXcUNSd3BiaHlLaE82N1c1dgpUY2RsV3Y3WGFLTGtsdVkraDBZY3BQT3BFTmZZYmxrNGk0QkV1L0wzYVY5MFFkUFFrMG94S01CS2R5QlBNZVNNCkJmS2pPWXdyOGgzM0ZWUWhmVkJnMXVGZ2tGaDdTamNuNHczUFdvc1BCMjNiVHBCbGR3VE9zemN4Qm9TaDNSVTkKTzZjb3lQdDdEN0drOCtHRlA3RGRUQTdoV1RkaUM4cDBkeHp2RUNmY0psMXNFeFEyZVprS1QvVzZyelNtVDhUTApCM0ErM1FDRDhEOEVsQU1IVy9zS25SeHphYU8welpNVmVlQnRnNlFGZ1F3M0dJMGo2ZTY0K2w3VExoOW8wSkZVCjdpUitPY01xUzVDY0NROGpWV3JPSk9Xc2dEbDZ4T2FFREczYnR5SVJHY29jbVcvcEZFQjNZd1A2S1BRTUIrNXkKWDdnZExEWmFGRFBVakZmblhkMnhHdUZlMnpRTDNVbXZEUkZuUlBBaW02QlpQbWo1OFh2emFhZXROa3lyaUZLZwp4Z0Z1dVpTcDUwV2JWdjF0MkdzOTMrRE53NlhFZHRFYnlWWUNBa28xTTY0MkozczFnN3NoQnRFQ0F3RUFBYU1qCk1DRXdEZ1lEVlIwUEFRSC9CQVFEQWdLa01BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUwKQlFBRGdnR0JBRXYxM2o2ZGFVbDBqeERPSnNTV1ZJZS9JdXNXVTRpN2ZXSWlqMHAwRU1GNS9LTE8yemRndTR5SQoreVd2T3N5aVFPanEzMlRYVlo2bTRDSnBkR1dGVE5HK2lLdXVOU3M0N3g3Q3dmVUNBWm5VVzhyamd3ZWJyS3BmCkJMNEVQcTZTcW0rSmltN0VPankyMWJkY2cyUXdZb3A3eUhvaHcveWEvL0l6RTMzVzZxNHlJeEFvNDBVYUhPTEMKTGtGbnNVYitjcFZBeFlPZGp6bjFzNWhnclpuWXlETEl3WmtIdFdEWm94alUzeC9jdnZzZ1FzLytzTWYrRFU4RgpZMkJKRHJjQ1VQM2xzclc0QVpFMFplZkEwOTlncFEvb3dSN0REYnMwSjZUeFM4NGt6Tldjc1FuWnRraXZheHJNClkyVHNnaWVndFExVFdGRWpxLy9sUFV4emJCdmpnd1FBZm5CQXZGeVNKejdTa0VuVm5rUXJGaUlUQVArTHljQVIKMlg4UFI2ZGI1bEt0SitBSENDM3kvZmNQS2k0ZzNTL3djeXRRdmdvOXJ6ODRFalp5YUNTaGJXNG9jNzNrMS9RcAowQWtHRDU0ZGVDWWVPYVJNbW96c0w3ZzdxWkpFekhtODdOcVBYSy9EZFoweWNxaVFhMXY2T3QxNjdXNUlzMUkzCjBWb0IzUzloSlE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCgo=" # noqa: E501
+ encrypted_cacert = "QeV4evTLXzcKwZZvmXQ/OvSHToXH3ISwfoLmU+Q9JlQWAFUHSJ9IhO0ewaQrJmx3NkfFb7NCxsQhh+wE57zDW4rWgn4w/SWkzvwSi1h2xYOO3ECEHzzVqgUm15Sk0xaj1Fv9Ed4hipf6PRijeOZ7A1G9zekr1w9WIvebMyJZrK+f6QJ8AP20NUZqG/3k+MeJr3kjrl+8uwU5aPOrHAexSQGAqSKTkWzW7glmlyMWTjwkuSgNVgFg0ctdWTZ5JnNwxXbpjwIKrC4E4sIHcxko2vsTeLF8pZFPk+3QUZIg8BrgtyM3lJC2kO1g3emPQhCIk3VDb5GBgssc/GyFyRXNS651d5BNgcABOKZ4Rv/gGnprB35zP7TKJKkST44XJTEBiugWMkSZg+T9H98/l3eE34O6thfTZXgIyG+ZM6uGlW2XOce0OoEIyJiEL039WJe3izjbD3b9sCCdgQc0MgS+hTaayJI6oCUWPsJLmRji19jLi/wjOsU5gPItCFWw3pBye/A4Zf8Hxm+hShvqBnk8R2yx1fPTiyw/Zx4Jn8m49XQJyjDSZnhIck0PVHR9xWzKCr++PKljLMLdkdFxVRVPFQk/FBbesqofjSXsq9DASY6ACTL3Jmignx2OXD6ac4SlBqCTjV2dIM0yEgZF7zwMNCtppRdXTV8S29JP4W2mfaiqXCUSRTggv8EYU+9diCE+8sPB6HjuLrsfiySbFlYR2m4ysDGXjsVx5CDAf0Nh4IRfcSceYnnBGIQ2sfgGcJFOZoJqr/QeE2NWz6jlWYbWT7MjS/0decpKxP7L88qrR+F48WXQvfsvjWgKjlMKw7lHmFF8FeY836VWWICTRZx+y6IlY1Ys2ML4kySF27Hal4OPhOOoBljMNMVwUEvBulOnKUWw4BGz8eGCl8Hw6tlyJdC7kcBj/aCyNCR/NnuDk4Wck6e//He8L6mS83OJi/hIFc8vYQxnCJMXj9Ou7wr5hxtBnvxXzZM3kFHxCDO24Cd5UyBV9GD8TiQJfBGAy7a2BCBMb5ESVX8NOkyyv2hXMHOjpnKhUM9yP3Ke4CBImO7mCKJNHdFVtAmuyVKJ+jT6ooAAArkX2xwEAvBEpvGNmW2jgs6wxSuKY0h5aUm0rA4v/s8fqSZhzdInB54sMldyAnt9G+9e+g933DfyA/tkc56Ed0vZ/XEvTkThVHyUbfYR/Gjsoab1RpnDBi4aZ2E7iceoBshy+L6NXdL0jlWEs4ZubiWlbVNWlN/MqJcjV/quLU7q4HtkG0MDEFm6To3o48x7xpv8otih6YBduNqBFnwQ6Qz9rM2chFgOR4IgNSZKPxHO0AGCi1gnK/CeCvrSfWYAMn+2rmw0hMZybqKMStG28+rXsKDdqmy6vAwL/+dJwkAW+ix68rWRXpeqHlWidu4SkIBELuwEkFIC/GJU/DRvcN2GG9uP1m+VFifCIS2UdiO4OVrP6PVoW1O+jBJvFH3K1YT7CRqevb9OzjS9fO1wjkOff0W8zZyJK9Mp25aynpf0k3oMpZDpjnlOsFXFUb3N6SvXD1Yi95szIlmsr5yRYaeGUJH7/SAmMr8R6RqsCR0ANptL2dtRoGPi/qcDQE15vnjJ+QMYCg9KbCdV+Qq5di93XAjmwPj6tKZv0aXQuaTZgYR7bdLmAnJaFLbHWcQG1k6F/vdKNEb7llLsoAD9KuKXPZT/LErIyKcI0RZySy9yvhTZb4jQWn17b83yfvqfd5/2NpcyaY4gNERhDRJHw7VhoS5Leai5ZnFaO3C1vU9tIJ85XgCUASTsBLoQWVCKPSQZGxzF7PVLnHui3YA5OsOQpVqAPtgGZ12tP9XkEKj+u2/Atj2bgYrqBF7zUL64X/AQpwr/UElWDhJLSD/KStVeDOUx3AwAVVi9eTUJr6NiNMutCE1sqUf9XVIddgZ/BaG5t3NV2L+T+11QzAl+Xrh8wH/XeUCTmnU3NGkvCz/9Y7PMS+qQL7T7WeGdYmEhb5s/5p/yjSYeqybr5sANOHs83OdeSXbop9cLWW+JksHmS//rHHcrrJhZgCb3P0EOpEoEMCarT6sJq0V1Hwf/YNFdJ9V7Ac654ALS+a9ffNthMUEJeY21QMtNOrEg3QH5RWBPn+yOYN/f38tzwlT1k6Ec94y/sBmeQVv8rRzkkiMSXeAL5ATdJntq8NQq5JbvLQDNnZnHQthZt+uhcUf08mWlRrxxBUaE6xLppgMqFdYSjLGvgn/d8FZ9y7UCg5ZBhgP1rrRQL1COpNKKlJLf5laqwiGAucIDmzSbhO+MidSauDLWuv+fsdd2QYk98PHxqNrPYLrlAlABFi3JEApBm4IlrGbHxKg6dRiy7L1c9xWnAD7E3XrZrSc6DXvGRsjMXWoQdlp4CX5H3cdH9sjIE6akWqiwwrOP6QTbJcxmJGv/MVhsDVrVKmrKSn2H0/Us1fyYCHCOyCSc2L96uId8i9wQO1NXj+1PJmUq3tJ8U0TUwTblOEQdYej99xEI8EzsXLjNJHCgbDygtHBYd/SHToXH3ISwfoLmU+Q9JlS1woaUpVa5sdvbsr4BXR6J" # noqa: E501
+
+ self.vca_collection.find_one.return_value = {
+ "_id": "2ade7f0e-9b58-4dbd-93a3-4ec076185d39",
+ "schema_version": "1.11",
+ "endpoints": [],
+ "user": "admin",
+ "secret": encrypted_secret,
+ "cacert": encrypted_cacert,
+ }
+ self.admin_collection.find_one.return_value = {
+ "serial": b"l+U3HDp9td+UjQ+AN+Ypj/Uh7n3C+rMJueQNNxkIpWI="
+ }
+ connection_data = self.loop.run_until_complete(
+ self.store.get_vca_connection_data("vca_id")
+ )
+ self.assertEqual(connection_data.endpoints, [])
+ self.assertEqual(connection_data.user, "admin")
+ self.assertEqual(connection_data.secret, secret)
+ self.assertEqual(
+ connection_data.cacert, b64decode(cacert.encode("utf-8")).decode("utf-8")
+ )
+
+ def test_update_vca_endpoints_exception(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.admin_collection.find_one.side_effect = [None, None]
+ self.admin_collection.insert_one.side_effect = DbException("already exists")
+ with self.assertRaises(DbException):
+ self.loop.run_until_complete(self.store.update_vca_endpoints(endpoints))
+ self.assertEqual(self.admin_collection.find_one.call_count, 2)
+ self.admin_collection.replace_one.assert_not_called()
+
+ def test_update_vca_endpoints_with_vca_id(self):
+ endpoints = ["1.2.3.4:17070"]
+ self.vca_collection.find_one.return_value = {}
+ self.loop.run_until_complete(
+ self.store.update_vca_endpoints(endpoints, "vca_id")
+ )
+ self.vca_collection.find_one.assert_called_once_with({"_id": "vca_id"})
+ self.vca_collection.replace_one.assert_called_once_with(
+ {"_id": "vca_id"}, {"endpoints": endpoints}
+ )
+
+ def test_get_vca_endpoints(self):
+ endpoints = ["1.2.3.4:17070"]
+ db_data = {"api_endpoints": endpoints}
+ db_returns = [db_data, None]
+ expected_returns = [endpoints, []]
+ returns = []
+ self.admin_collection.find_one.side_effect = db_returns
+ for _ in range(len(db_returns)):
+ e = self.loop.run_until_complete(self.store.get_vca_endpoints())
+ returns.append(e)
+ self.assertEqual(expected_returns, returns)
+
+ @patch("n2vc.vca.connection_data.base64_to_cacert")
+ def test_get_vca_endpoints_with_vca_id(self, mock_base64_to_cacert):
+ expected_endpoints = ["1.2.3.4:17070"]
+ mock_base64_to_cacert.return_value = "cacert"
+ self.store.get_vca_connection_data = AsyncMock()
+ self.store.get_vca_connection_data.return_value = ConnectionData(
+ **{
+ "endpoints": expected_endpoints,
+ "user": "admin",
+ "secret": "1234",
+ "cacert": "cacert",
+ }
+ )
+ endpoints = self.loop.run_until_complete(self.store.get_vca_endpoints("vca_id"))
+ self.store.get_vca_connection_data.assert_called_with("vca_id")
+ self.assertEqual(expected_endpoints, endpoints)
+
+ def test_get_vca_id(self):
+ self.assertIsNone(self.loop.run_until_complete((self.store.get_vca_id())))
+
+ def test_get_vca_id_with_vim_id(self):
+ self.vim_accounts_collection.find_one.return_value = {"vca": "vca_id"}
+ vca_id = self.loop.run_until_complete(self.store.get_vca_id("vim_id"))
+ self.vim_accounts_collection.find_one.assert_called_once_with({"_id": "vim_id"})
+ self.assertEqual(vca_id, "vca_id")
from unittest import TestCase
-from n2vc.utils import Dict, EntityType, JujuStatusToOSM, N2VCDeploymentStatus, DB_DATA
+from n2vc.utils import Dict, EntityType, JujuStatusToOSM, N2VCDeploymentStatus
from juju.machine import Machine
from juju.application import Application
from juju.action import Action
osm_status = status["osm"]
self.assertTrue(juju_status in JujuStatusToOSM[entity_type])
self.assertEqual(osm_status, JujuStatusToOSM[entity_type][juju_status])
-
- def test_db_data(self):
- self.assertEqual(DB_DATA.api_endpoints.table, "admin")
- self.assertEqual(DB_DATA.api_endpoints.filter, {"_id": "juju"})
- self.assertEqual(DB_DATA.api_endpoints.key, "api_endpoints")
detailed_status: str,
vca_status: str,
entity_type: str,
+ vca_id: str = None,
):
+ """
+ Write application status to database
+
+ :param: db_dict: DB dictionary
+ :param: status: Status of the application
+ :param: detailed_status: Detailed status
+ :param: vca_status: VCA status
+ :param: entity_type: Entity type ("application", "machine, and "action")
+ :param: vca_id: Id of the VCA. If None, the default VCA will be used.
+ """
self.last_written_values = Dict(
{
"n2vc_status": status,
},
}
-DB_DATA = Dict(
- {
- "api_endpoints": Dict(
- {"table": "admin", "filter": {"_id": "juju"}, "key": "api_endpoints"}
- )
- }
-)
-
def obj_to_yaml(obj: object) -> str:
"""
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Cloud:
+ def __init__(self, name: str, credential_name: str):
+ """
+ Constructor
+
+ :param: name: Name of the Cloud
+ :param: credential_name: Credential name for the Cloud
+ """
+ self.name = name
+ self.credential_name = credential_name
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import typing
+
+from n2vc.config import EnvironConfig, ModelConfig
+from n2vc.store import Store
+from n2vc.vca.cloud import Cloud
+from n2vc.vca.connection_data import ConnectionData
+
+
+class Connection:
+ def __init__(self, store: Store, vca_id: str = None):
+ """
+ Contructor
+
+ :param: store: Store object. Used to communicate wuth the DB
+ :param: vca_id: Id of the VCA. If none specified, the default VCA will be used.
+ """
+ self._data = None
+ self.default = vca_id is None
+ self._vca_id = vca_id
+ self._store = store
+
+ async def load(self):
+ """Load VCA connection data"""
+ await self._load_vca_connection_data()
+
+ @property
+ def is_default(self):
+ return self._vca_id is None
+
+ @property
+ def data(self) -> ConnectionData:
+ return self._data
+
+ async def _load_vca_connection_data(self) -> typing.NoReturn:
+ """
+ Load VCA connection data
+
+ If self._vca_id is None, it will get the VCA data from the Environment variables,
+ and the default VCA will be used. If it is not None, then it means that it will
+ load the credentials from the database (A non-default VCA will be used).
+ """
+ if self._vca_id:
+ self._data = await self._store.get_vca_connection_data(self._vca_id)
+ else:
+ envs = EnvironConfig()
+ # Get endpoints from the DB and ENV. Check if update in the database is needed or not.
+ db_endpoints = await self._store.get_vca_endpoints()
+ env_endpoints = (
+ envs["endpoints"].split(",")
+ if "endpoints" in envs
+ else ["{}:{}".format(envs["host"], envs.get("port", 17070))]
+ )
+
+ db_update_needed = not all(e in db_endpoints for e in env_endpoints)
+
+ endpoints = env_endpoints if db_update_needed else db_endpoints
+ config = {
+ "endpoints": endpoints,
+ "user": envs["user"],
+ "secret": envs["secret"],
+ "cacert": envs["cacert"],
+ "pubkey": envs["pubkey"],
+ "lxd-cloud": envs["cloud"],
+ "lxd-credentials": envs.get("credentials", envs["cloud"]),
+ "k8s-cloud": envs["k8s_cloud"],
+ "k8s-credentials": envs.get("k8s_credentials", envs["k8s_cloud"]),
+ "model-config": ModelConfig(envs),
+ "api-proxy": envs.get("api_proxy", None),
+ }
+ self._data = ConnectionData(**config)
+ if db_update_needed:
+ await self.update_endpoints(endpoints)
+
+ @property
+ def endpoints(self):
+ return self._data.endpoints
+
+ async def update_endpoints(self, endpoints: typing.List[str]):
+ await self._store.update_vca_endpoints(endpoints, self._vca_id)
+ self._data.endpoints = endpoints
+
+ @property
+ def lxd_cloud(self) -> Cloud:
+ return Cloud(self.data.lxd_cloud, self.data.lxd_credentials)
+
+ @property
+ def k8s_cloud(self) -> Cloud:
+ return Cloud(self.data.k8s_cloud, self.data.k8s_credentials)
+
+
+async def get_connection(store: Store, vca_id: str = None) -> Connection:
+ """
+ Get Connection
+
+ Method to get a Connection object with the VCA information loaded
+ """
+ connection = Connection(store, vca_id=vca_id)
+ await connection.load()
+ return connection
--- /dev/null
+# Copyright 2021 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from n2vc.utils import base64_to_cacert
+
+
+class ConnectionData:
+ def __init__(self, **kwargs):
+ """
+ Constructor
+
+ :param: kwargs:
+ endpoints (list): Endpoints of all the Juju controller units
+ user (str): Username for authenticating to the controller
+ secret (str): Secret for authenticating to the controller
+ cacert (str): Base64 encoded CA certificate for authenticating to the controller
+ (optional) pubkey (str): Public key to insert to the charm.
+ This is useful to do `juju ssh`.
+ It is not very useful though.
+ TODO: Test it.
+ (optional) lxd-cloud (str): Name of the cloud to use for lxd proxy charms
+ (optional) lxd-credentials (str): Name of the lxd-cloud credentials
+ (optional) k8s-cloud (str): Name of the cloud to use for k8s proxy charms
+ (optional) k8s-credentials (str): Name of the k8s-cloud credentials
+ (optional) model-config (n2vc.config.ModelConfig): Config to apply in all Juju models
+ (deprecated, optional) api-proxy (str): Proxy IP to reach the controller.
+ Used in case native charms cannot react the controller.
+ """
+ self.endpoints = kwargs["endpoints"]
+ self.user = kwargs["user"]
+ self.secret = kwargs["secret"]
+ self.cacert = base64_to_cacert(kwargs["cacert"])
+ self.pubkey = kwargs.get("pubkey", "")
+ self.lxd_cloud = kwargs.get("lxd-cloud", None)
+ self.lxd_credentials = kwargs.get("lxd-credentials", None)
+ self.k8s_cloud = kwargs.get("k8s-cloud", None)
+ self.k8s_credentials = kwargs.get("k8s-credentials", None)
+ self.model_config = kwargs.get("model-config", {})
+ self.model_config.update({"authorized-keys": self.pubkey})
+ self.api_proxy = kwargs.get("api-proxy", None)
juju==2.8.4
kubernetes==10.0.1
-pyasn1
\ No newline at end of file
+pyasn1
+motor==1.3.1
+retrying-async
+async-timeout==3.0.1
+ # via retrying-async
bcrypt==3.2.0
# via paramiko
cachetools==4.2.1
# via
# juju
# theblues
+motor==1.3.1
+ # via -r requirements.in
mypy-extensions==0.4.3
# via typing-inspect
oauthlib==3.1.0
# via cffi
pymacaroons==0.13.0
# via macaroonbakery
+pymongo==3.11.3
+ # via motor
pynacl==1.4.0
# via
# macaroonbakery
# macaroonbakery
# requests-oauthlib
# theblues
+retrying-async==1.2.0
+ # via -r requirements.in
rsa==4.7.2
# via google-auth
six==1.15.0