From 32862bb30e8b98ef7f21ed64a1327bd7462e7767 Mon Sep 17 00:00:00 2001 From: beierlm Date: Tue, 21 Apr 2020 16:36:35 -0400 Subject: [PATCH] Enable lint, flake8 and unit tests Cleans up non pep compliant code. Adds a simple unit test. Formats according to black. Tox automatically runs lint, flake8 and unit test suite with coverage. To run each individually, execute: tox -e pylint tox -e black tox -e flake8 tox -e cover Note that these are all run for each patch via Jenkins. The full tox suite should be run locally before any commit to ensure it will not fail in Jenkins. Change-Id: I2f87abe3d5086d6d65ac33a27780c498fc7b1cd3 Signed-off-by: beierlm --- .gitignore | 23 + devops-stages/stage-test.sh | 2 +- n2vc/__init__.py | 2 +- n2vc/exceptions.py | 36 +- n2vc/juju_observer.py | 133 ++-- n2vc/k8s_conn.py | 279 ++++--- n2vc/k8s_helm_conn.py | 992 ++++++++++++++----------- n2vc/k8s_juju_conn.py | 394 ++++------ n2vc/loggable.py | 112 +-- n2vc/n2vc_conn.py | 314 ++++---- n2vc/n2vc_juju_conn.py | 1066 ++++++++++++++++----------- n2vc/provisioner.py | 113 ++- n2vc/tests/__init__.py | 13 + n2vc/tests/unit/__init__.py | 13 + n2vc/tests/unit/test_provisioner.py | 158 ++++ n2vc/vnf.py | 570 +++++++------- requirements.txt | 19 +- test-requirements.txt | 18 + tox.ini | 86 +-- 19 files changed, 2415 insertions(+), 1928 deletions(-) create mode 100644 n2vc/tests/__init__.py create mode 100644 n2vc/tests/unit/__init__.py create mode 100644 n2vc/tests/unit/test_provisioner.py create mode 100644 test-requirements.txt diff --git a/.gitignore b/.gitignore index 543898d..11426c9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,17 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + __pycache__ *.pyc .tox/ @@ -7,3 +21,12 @@ dist/ .cache/ .local/ N2VC.egg-info/ +.coverage +cover +coverage.xml +.tox +nosetests.xml +.cache +.vscode/ +.project +.pydevproject diff --git a/devops-stages/stage-test.sh b/devops-stages/stage-test.sh index 9c960cb..a4a0604 100755 --- a/devops-stages/stage-test.sh +++ b/devops-stages/stage-test.sh @@ -13,4 +13,4 @@ # limitations under the License. #!/bin/sh -#tox +tox \ No newline at end of file diff --git a/n2vc/__init__.py b/n2vc/__init__.py index ac8adf5..d97c31c 100644 --- a/n2vc/__init__.py +++ b/n2vc/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -version = '0.0.2' +version = "0.0.2" diff --git a/n2vc/exceptions.py b/n2vc/exceptions.py index 815d4ea..09f3573 100644 --- a/n2vc/exceptions.py +++ b/n2vc/exceptions.py @@ -41,7 +41,7 @@ class AuthenticationFailed(Exception): """The authentication for the specified user failed.""" -class NotImplemented(Exception): +class MethodNotImplemented(Exception): """The method is not implemented.""" @@ -50,7 +50,7 @@ class N2VCException(Exception): N2VC exception base class """ - def __init__(self, message: str = ''): + def __init__(self, message: str = ""): Exception.__init__(self, message) self.message = message @@ -58,7 +58,7 @@ class N2VCException(Exception): return self.message def __repr__(self): - return '{}({})'.format(type(self), self.message) + return "{}({})".format(type(self), self.message) class N2VCBadArgumentsException(N2VCException): @@ -66,12 +66,14 @@ class N2VCBadArgumentsException(N2VCException): Bad argument values exception """ - def __init__(self, message: str = '', bad_args: list = None): + def __init__(self, message: str = "", bad_args: list = None): N2VCException.__init__(self, message=message) self.bad_args = bad_args def __str__(self): - return '<{}> Bad arguments: {} -> {}'.format(type(self), super().__str__(), self.bad_args) + return "<{}> Bad arguments: {} -> {}".format( + type(self), super().__str__(), self.bad_args + ) class N2VCConnectionException(N2VCException): @@ -79,12 +81,14 @@ class N2VCConnectionException(N2VCException): Error connecting to VCA """ - def __init__(self, message: str = '', url: str = None): + def __init__(self, message: str = "", url: str = None): N2VCException.__init__(self, message=message) self.url = url def __str__(self): - return '<{}> Connection to {} failed: {}'.format(type(self), self.url, super().__str__()) + return "<{}> Connection to {} failed: {}".format( + type(self), self.url, super().__str__() + ) class N2VCTimeoutException(N2VCException): @@ -92,12 +96,12 @@ class N2VCTimeoutException(N2VCException): Timeout """ - def __init__(self, message: str = '', timeout: str = ''): + def __init__(self, message: str = "", timeout: str = ""): N2VCException.__init__(self, message=message) self.timeout = timeout def __str__(self): - return '<{}> {} timeout: {}'.format(type(self), self.timeout, super().__str__()) + return "<{}> {} timeout: {}".format(type(self), self.timeout, super().__str__()) class N2VCExecutionException(N2VCException): @@ -105,12 +109,14 @@ class N2VCExecutionException(N2VCException): Error executing primitive """ - def __init__(self, message: str = '', primitive_name: str = ''): + def __init__(self, message: str = "", primitive_name: str = ""): N2VCException.__init__(self, message=message) self.primitive_name = primitive_name def __str__(self): - return '<{}> Error executing primitive {} failed: {}'.format(type(self), self.primitive_name, super().__str__()) + return "<{}> Error executing primitive {} failed: {}".format( + type(self), self.primitive_name, super().__str__() + ) class N2VCInvalidCertificate(N2VCException): @@ -118,11 +124,11 @@ class N2VCInvalidCertificate(N2VCException): Invalid certificate """ - def __init__(self, message: str = ''): + def __init__(self, message: str = ""): N2VCException.__init__(self, message=message) def __str__(self): - return '<{}> Invalid certificate: {}'.format(type(self), super().__str__()) + return "<{}> Invalid certificate: {}".format(type(self), super().__str__()) class N2VCNotFound(N2VCException): @@ -130,11 +136,11 @@ class N2VCNotFound(N2VCException): Not found """ - def __init__(self, message: str = ''): + def __init__(self, message: str = ""): N2VCException.__init__(self, message=message) def __str__(self): - return '<{}> Not found: {}'.format(type(self), super().__str__()) + return "<{}> Not found: {}".format(type(self), super().__str__()) class K8sException(Exception): diff --git a/n2vc/juju_observer.py b/n2vc/juju_observer.py index e2f0470..7ed3dee 100644 --- a/n2vc/juju_observer.py +++ b/n2vc/juju_observer.py @@ -23,13 +23,13 @@ import asyncio import time -from juju.model import ModelObserver, Model -from juju.machine import Machine -from juju.application import Application from juju.action import Action +from juju.application import Application +from juju.machine import Machine +from juju.model import ModelObserver, Model -from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status from n2vc.exceptions import N2VCTimeoutException +from n2vc.n2vc_conn import N2VCConnector, juju_status_2_osm_status class _Entity: @@ -42,7 +42,6 @@ class _Entity: class JujuModelObserver(ModelObserver): - def __init__(self, n2vc: N2VCConnector, model: Model): self.n2vc = n2vc self.model = model @@ -54,11 +53,14 @@ class JujuModelObserver(ModelObserver): def register_machine(self, machine: Machine, db_dict: dict): try: entity_id = machine.entity_id - except Exception as e: + except Exception: # no entity_id aatribute, try machine attribute entity_id = machine.machine - # self.n2vc.debug(msg='Registering machine for change notifications: {}'.format(entity_id)) - entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict) + # self.n2vc.debug( + # msg='Registering machine for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="machine", obj=machine, db_dict=db_dict + ) self.machines[entity_id] = entity def unregister_machine(self, machine_id: str): @@ -70,8 +72,14 @@ class JujuModelObserver(ModelObserver): def register_application(self, application: Application, db_dict: dict): entity_id = application.entity_id - # self.n2vc.debug(msg='Registering application for change notifications: {}'.format(entity_id)) - entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict) + # self.n2vc.debug( + # msg='Registering application for change notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, + entity_type="application", + obj=application, + db_dict=db_dict, + ) self.applications[entity_id] = entity def unregister_application(self, application_id: str): @@ -83,8 +91,11 @@ class JujuModelObserver(ModelObserver): def register_action(self, action: Action, db_dict: dict): entity_id = action.entity_id - # self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id)) - entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict) + # self.n2vc.debug( + # msg='Registering action for changes notifications: {}'.format(entity_id)) + entity = _Entity( + entity_id=entity_id, entity_type="action", obj=action, db_dict=db_dict + ) self.actions[entity_id] = entity def unregister_action(self, action_id: str): @@ -95,74 +106,81 @@ class JujuModelObserver(ModelObserver): return action_id in self.actions async def wait_for_machine( - self, - machine_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + machine_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_machine_registered(machine_id): return - self.n2vc.debug('Waiting for machine completed: {}'.format(machine_id)) + self.n2vc.debug("Waiting for machine completed: {}".format(machine_id)) # wait for a final state entity = self.machines[machine_id] return await self._wait_for_entity( entity=entity, - field_to_check='agent_status', - final_states_list=['started'], + field_to_check="agent_status", + final_states_list=["started"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def wait_for_application( - self, - application_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + application_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_application_registered(application_id): return - self.n2vc.debug('Waiting for application completed: {}'.format(application_id)) + self.n2vc.debug("Waiting for application completed: {}".format(application_id)) # application statuses: unknown, active, waiting # wait for a final state entity = self.applications[application_id] return await self._wait_for_entity( entity=entity, - field_to_check='status', - final_states_list=['active', 'blocked'], + field_to_check="status", + final_states_list=["active", "blocked"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def wait_for_action( - self, - action_id: str, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + action_id: str, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: if not self.is_action_registered(action_id): return - self.n2vc.debug('Waiting for action completed: {}'.format(action_id)) + self.n2vc.debug("Waiting for action completed: {}".format(action_id)) # action statuses: pending, running, completed, failed, cancelled # wait for a final state entity = self.actions[action_id] return await self._wait_for_entity( entity=entity, - field_to_check='status', - final_states_list=['completed', 'failed', 'cancelled'], + field_to_check="status", + final_states_list=["completed", "failed", "cancelled"], progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) async def _wait_for_entity( - self, - entity: _Entity, - field_to_check: str, - final_states_list: list, - progress_timeout: float = None, - total_timeout: float = None) -> int: + self, + entity: _Entity, + field_to_check: str, + final_states_list: list, + progress_timeout: float = None, + total_timeout: float = None, + ) -> int: # default values for no timeout if total_timeout is None: @@ -176,8 +194,10 @@ class JujuModelObserver(ModelObserver): if now >= total_end: raise N2VCTimeoutException( - message='Total timeout {} seconds, {}: {}'.format(total_timeout, entity.entity_type, entity.entity_id), - timeout='total' + message="Total timeout {} seconds, {}: {}".format( + total_timeout, entity.entity_type, entity.entity_id + ), + timeout="total", ) # update next progress timeout @@ -195,10 +215,11 @@ class JujuModelObserver(ModelObserver): if await _wait_for_event_or_timeout(entity.event, next_timeout): entity.event.clear() else: - message = 'Progress timeout {} seconds, {}}: {}'\ - .format(progress_timeout, entity.entity_type, entity.entity_id) + message = "Progress timeout {} seconds, {}}: {}".format( + progress_timeout, entity.entity_type, entity.entity_id + ) self.n2vc.debug(message) - raise N2VCTimeoutException(message=message, timeout='progress') + raise N2VCTimeoutException(message=message, timeout="progress") # self.n2vc.debug('End of wait. Final state: {}, retries: {}' # .format(entity.obj.__getattribute__(field_to_check), retries)) return retries @@ -212,7 +233,7 @@ class JujuModelObserver(ModelObserver): # self.n2vc.debug('on_change(): type: {}, entity: {}, id: {}' # .format(delta.type, delta.entity, new.entity_id)) - if delta.entity == 'machine': + if delta.entity == "machine": # check registered machine if new.entity_id not in self.machines: @@ -224,13 +245,13 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.agent_status), detailed_status=new.status_message, vca_status=new.status, - entity_type='machine' + entity_type="machine", ) # set event for this machine self.machines[new.entity_id].event.set() - elif delta.entity == 'application': + elif delta.entity == "application": # check registered application if new.entity_id not in self.applications: @@ -242,16 +263,16 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.status), detailed_status=new.status_message, vca_status=new.status, - entity_type='application' + entity_type="application", ) # set event for this application self.applications[new.entity_id].event.set() - elif delta.entity == 'unit': + elif delta.entity == "unit": # get the application for this unit - application_id = delta.data['application'] + application_id = delta.data["application"] # check registered application if application_id not in self.applications: @@ -264,13 +285,13 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.workload_status), detailed_status=new.workload_status_message, vca_status=new.workload_status, - entity_type='unit' + entity_type="unit", ) # set event for this application self.applications[application_id].event.set() - elif delta.entity == 'action': + elif delta.entity == "action": # check registered action if new.entity_id not in self.actions: @@ -282,7 +303,7 @@ class JujuModelObserver(ModelObserver): status=juju_status_2_osm_status(delta.entity, new.status), detailed_status=new.status, vca_status=new.status, - entity_type='action' + entity_type="action", ) # set event for this application diff --git a/n2vc/k8s_conn.py b/n2vc/k8s_conn.py index b1f3230..a3ad29a 100644 --- a/n2vc/k8s_conn.py +++ b/n2vc/k8s_conn.py @@ -20,26 +20,21 @@ # contact with: nfvlabs@tid.es ## -import asyncio -from n2vc.loggable import Loggable import abc +import asyncio import time +from n2vc.loggable import Loggable -class K8sConnector(abc.ABC, Loggable): +class K8sConnector(abc.ABC, Loggable): """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ - def __init__( - self, - db: object, - log: object = None, - on_update_db=None - ): + def __init__(self, db: object, log: object = None, on_update_db=None): """ :param db: database object to write current operation status @@ -48,7 +43,7 @@ class K8sConnector(abc.ABC, Loggable): """ # parent class - Loggable.__init__(self, log=log, log_to_console=True, prefix='\nK8S') + Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S") # self.log.info('Initializing generic K8S connector') @@ -60,31 +55,26 @@ class K8sConnector(abc.ABC, Loggable): @abc.abstractmethod async def init_env( - self, - k8s_creds: str, - namespace: str = 'kube-system', - reuse_cluster_uuid=None + self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None ) -> (str, bool): """ - It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides: + It prepares a given K8s cluster environment to run Charts or juju Bundles on + both sides: client (OSM) server (Tiller/Charm) - :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' - :param namespace: optional namespace to be used for the K8s engine (helm tiller, juju). - By default, 'kube-system' will be used + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid + '.kube/config' + :param namespace: optional namespace to be used for the K8s engine (helm + tiller, juju). By default, 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse - :return: uuid of the K8s cluster and True if connector has installed some software in the cluster - (on error, an exception will be raised) + :return: uuid of the K8s cluster and True if connector has installed some + software in the cluster (on error, an exception will be raised) """ @abc.abstractmethod async def repo_add( - self, - cluster_uuid: str, - name: str, - url: str, - repo_type: str = 'chart' + self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): """ Add a new repository to OSM database @@ -97,10 +87,7 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def repo_list( - self, - cluster_uuid: str - ): + async def repo_list(self, cluster_uuid: str): """ Get the list of registered repositories @@ -109,11 +96,7 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def repo_remove( - self, - cluster_uuid: str, - name: str - ): + async def repo_remove(self, cluster_uuid: str, name: str): """ Remove a repository from OSM @@ -123,66 +106,65 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def synchronize_repos( - self, - cluster_uuid: str, - name: str - ): + async def synchronize_repos(self, cluster_uuid: str, name: str): """ Synchronizes the list of repositories created in the cluster with the repositories added by the NBI :param cluster_uuid: the cluster - :return: List of repositories deleted from the cluster and dictionary with repos added + :return: List of repositories deleted from the cluster and dictionary with + repos added """ @abc.abstractmethod async def reset( - self, - cluster_uuid: str, - force: bool = False, - uninstall_sw: bool = False + self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: """ - Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list of known K8s clusters. - Intended to be used e.g. when the NS instance is deleted. + Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list + of known K8s clusters. Intended to be used e.g. when the NS instance is deleted. :param cluster_uuid: UUID of a K8s cluster known by OSM. :param force: force deletion, even in case there are deployed releases - :param uninstall_sw: flag to indicate that sw uninstallation from software is needed + :param uninstall_sw: flag to indicate that sw uninstallation from software is + needed :return: str: kdu_instance generated by helm """ @abc.abstractmethod async def install( - self, - cluster_uuid: str, - kdu_model: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None, - kdu_name: str = None, - namespace: str = None + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, ): """ - Deploys of a new KDU instance. It would implicitly rely on the `install` call to deploy the Chart/Bundle - properly parametrized (in practice, this call would happen before any _initial-config-primitive_ - of the VNF is called). + Deploys of a new KDU instance. It would implicitly rely on the `install` call + to deploy the Chart/Bundle properly parametrized (in practice, this call would + happen before any _initial-config-primitive_of the VNF is called). :param cluster_uuid: UUID of a K8s cluster known by OSM - :param kdu_model: chart/bundle:version reference (string), which can be either of these options: + :param kdu_model: chart/bundle:version reference (string), which can be either + of these options: - a name of chart/bundle available via the repos known by OSM - a path to a packaged chart/bundle - a path to an unpacked chart/bundle directory or a URL - :param atomic: If set, installation process purges chart/bundle on fail, also will wait until - all the K8s objects are active - :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to - Helm default timeout: 300s) - :param params: dictionary of key-value pairs for instantiation parameters (overriding default values) + :param atomic: If set, installation process purges chart/bundle on fail, also + will wait until all the K8s objects are active + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) + :param params: dictionary of key-value pairs for instantiation parameters + (overriding default values) :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} :param kdu_name: Name of the KDU instance to be installed :param namespace: K8s namespace to use for the KDU instance :return: True if successful @@ -190,63 +172,64 @@ class K8sConnector(abc.ABC, Loggable): @abc.abstractmethod async def upgrade( - self, - cluster_uuid: str, - kdu_instance: str, - kdu_model: str = None, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, ): """ - Upgrades an existing KDU instance. It would implicitly use the `upgrade` call over an existing Chart/Bundle. - It can be used both to upgrade the chart or to reconfigure it. This would be exposed as Day-2 primitive. + Upgrades an existing KDU instance. It would implicitly use the `upgrade` call + over an existing Chart/Bundle. It can be used both to upgrade the chart or to + reconfigure it. This would be exposed as Day-2 primitive. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance to be updated :param kdu_model: new chart/bundle:version reference - :param atomic: rollback in case of fail and wait for pods and services are available - :param timeout: Time in seconds to wait for the install of the chart/bundle (defaults to - Helm default timeout: 300s) + :param atomic: rollback in case of fail and wait for pods and services are + available + :param timeout: Time in seconds to wait for the install of the chart/bundle + (defaults to Helm default timeout: 300s) :param params: new dictionary of key-value pairs for instantiation parameters :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} :return: reference to the new revision number of the KDU instance """ @abc.abstractmethod async def rollback( - self, - cluster_uuid: str, - kdu_instance: str, - revision=0, - db_dict: dict = None + self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): """ - Rolls back a previous update of a KDU instance. It would implicitly use the `rollback` call. - It can be used both to rollback from a Chart/Bundle version update or from a reconfiguration. - This would be exposed as Day-2 primitive. + Rolls back a previous update of a KDU instance. It would implicitly use the + `rollback` call. It can be used both to rollback from a Chart/Bundle version + update or from a reconfiguration. This would be exposed as Day-2 primitive. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance - :param revision: revision to which revert changes. If omitted, it will revert the last update only + :param revision: revision to which revert changes. If omitted, it will revert + the last update only :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.K8S.3"} - :return:If successful, reference to the current active revision of the KDU instance after the rollback + It contains a dict with {collection: , filter: {}, + path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.K8S.3"} + :return:If successful, reference to the current active revision of the KDU + instance after the rollback """ @abc.abstractmethod - async def uninstall( - self, - cluster_uuid: str, - kdu_instance: str - ): + async def uninstall(self, cluster_uuid: str, kdu_instance: str): """ - Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen - after all _terminate-config-primitive_ of the VNF are invoked). + Removes an existing KDU instance. It would implicitly use the `delete` call + (this call would happen after all _terminate-config-primitive_ of the VNF are + invoked). :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance to be deleted @@ -276,48 +259,41 @@ class K8sConnector(abc.ABC, Loggable): """ @abc.abstractmethod - async def inspect_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: """ These calls will retrieve from the Chart/Bundle: - - The list of configurable values and their defaults (e.g. in Charts, it would retrieve - the contents of `values.yaml`). - - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle. + - The list of configurable values and their defaults (e.g. in Charts, + it would retrieve the contents of `values.yaml`). + - If available, any embedded help file (e.g. `readme.md`) embedded in the + Chart/Bundle. :param kdu_model: chart/bundle reference - :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL) + :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, + even stable URL) :return: - If successful, it will return the available parameters and their default values as provided by the backend. + If successful, it will return the available parameters and their default values + as provided by the backend. """ @abc.abstractmethod - async def help_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: """ :param kdu_model: chart/bundle reference - :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL) + :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, + even stable URL) :return: If successful, it will return the contents of the 'readme.md' """ @abc.abstractmethod - async def status_kdu( - self, - cluster_uuid: str, - kdu_instance: str - ) -> str: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: """ - This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve - the _composition_ (i.e. K8s objects) and _specific values_ of the configuration parameters applied - to a given instance. This call would be based on the `status` call. + This call would retrieve tha current state of a given KDU instance. It would be + would allow to retrieve the _composition_ (i.e. K8s objects) and _specific + values_ of the configuration parameters applied to a given instance. This call + would be based on the `status` call. :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance @@ -330,65 +306,64 @@ class K8sConnector(abc.ABC, Loggable): - SUPERSEDED - FAILED or - DELETING - - List of `resources` (objects) that this release consists of, sorted by kind, and the status of those resources + - List of `resources` (objects) that this release consists of, sorted by kind, + and the status of those resources - Last `deployment_time`. """ """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ async def write_app_status_to_db( - self, - db_dict: dict, - status: str, - detailed_status: str, - operation: str + self, db_dict: dict, status: str, detailed_status: str, operation: str ) -> bool: if not self.db: - self.warning('No db => No database write') + self.warning("No db => No database write") return False if not db_dict: - self.warning('No db_dict => No database write') + self.warning("No db_dict => No database write") return False - self.log.debug('status={}'.format(status)) + self.log.debug("status={}".format(status)) try: - the_table = db_dict['collection'] - the_filter = db_dict['filter'] - the_path = db_dict['path'] - if not the_path[-1] == '.': - the_path = the_path + '.' + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + the_path = db_dict["path"] + if not the_path[-1] == ".": + the_path = the_path + "." update_dict = { - the_path + 'operation': operation, - the_path + 'status': status, - the_path + 'detailed-status': detailed_status, - the_path + 'status-time': str(time.time()), + the_path + "operation": operation, + the_path + "status": status, + the_path + "detailed-status": detailed_status, + the_path + "status-time": str(time.time()), } self.db.set_one( table=the_table, q_filter=the_filter, update_dict=update_dict, - fail_on_empty=True + fail_on_empty=True, ) # database callback if self.on_update_db: if asyncio.iscoroutinefunction(self.on_update_db): - await self.on_update_db(the_table, the_filter, the_path, update_dict) + await self.on_update_db( + the_table, the_filter, the_path, update_dict + ) else: self.on_update_db(the_table, the_filter, the_path, update_dict) return True except Exception as e: - self.log.info('Exception writing status to database: {}'.format(e)) + self.log.info("Exception writing status to database: {}".format(e)) return False diff --git a/n2vc/k8s_helm_conn.py b/n2vc/k8s_helm_conn.py index d3fbed6..fdfc443 100644 --- a/n2vc/k8s_helm_conn.py +++ b/n2vc/k8s_helm_conn.py @@ -20,34 +20,35 @@ # contact with: nfvlabs@tid.es ## -import subprocess +import asyncio import os +import random import shutil -import asyncio +import subprocess import time -import yaml from uuid import uuid4 -import random -from n2vc.k8s_conn import K8sConnector + from n2vc.exceptions import K8sException +from n2vc.k8s_conn import K8sConnector +import yaml class K8sHelmConnector(K8sConnector): """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ def __init__( - self, - fs: object, - db: object, - kubectl_command: str = '/usr/bin/kubectl', - helm_command: str = '/usr/bin/helm', - log: object = None, - on_update_db=None + self, + fs: object, + db: object, + kubectl_command: str = "/usr/bin/kubectl", + helm_command: str = "/usr/bin/helm", + log: object = None, + on_update_db=None, ): """ @@ -60,14 +61,9 @@ class K8sHelmConnector(K8sConnector): """ # parent class - K8sConnector.__init__( - self, - db=db, - log=log, - on_update_db=on_update_db - ) + K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db) - self.log.info('Initializing K8S Helm connector') + self.log.info("Initializing K8S Helm connector") # random numbers for release name generation random.seed(time.time()) @@ -84,32 +80,37 @@ class K8sHelmConnector(K8sConnector): self._check_file_exists(filename=helm_command, exception_if_not_exists=True) # initialize helm client-only - self.log.debug('Initializing helm client-only...') - command = '{} init --client-only'.format(self._helm_command) + self.log.debug("Initializing helm client-only...") + command = "{} init --client-only".format(self._helm_command) try: - asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False)) + asyncio.ensure_future( + self._local_async_exec(command=command, raise_exception_on_error=False) + ) # loop = asyncio.get_event_loop() - # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False)) + # loop.run_until_complete(self._local_async_exec(command=command, + # raise_exception_on_error=False)) except Exception as e: - self.warning(msg='helm init failed (it was already initialized): {}'.format(e)) + self.warning( + msg="helm init failed (it was already initialized): {}".format(e) + ) - self.log.info('K8S Helm connector initialized') + self.log.info("K8S Helm connector initialized") async def init_env( - self, - k8s_creds: str, - namespace: str = 'kube-system', - reuse_cluster_uuid=None + self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None ) -> (str, bool): """ It prepares a given K8s cluster environment to run Charts on both sides: client (OSM) server (Tiller) - :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' - :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid + '.kube/config' + :param namespace: optional namespace to be used for helm. By default, + 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse - :return: uuid of the K8s cluster and True if connector has installed some software in the cluster + :return: uuid of the K8s cluster and True if connector has installed some + software in the cluster (on error, an exception will be raised) """ @@ -117,19 +118,23 @@ class K8sHelmConnector(K8sConnector): if not cluster_uuid: cluster_uuid = str(uuid4()) - self.log.debug('Initializing K8S environment. namespace: {}'.format(namespace)) + self.log.debug("Initializing K8S environment. namespace: {}".format(namespace)) # create config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) f = open(config_filename, "w") f.write(k8s_creds) f.close() # check if tiller pod is up in cluster - command = '{} --kubeconfig={} --namespace={} get deployments'\ - .format(self.kubectl_command, config_filename, namespace) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + command = "{} --kubeconfig={} --namespace={} get deployments".format( + self.kubectl_command, config_filename, namespace + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) output_table = K8sHelmConnector._output_to_table(output=output) @@ -137,90 +142,98 @@ class K8sHelmConnector(K8sConnector): already_initialized = False try: for row in output_table: - if row[0].startswith('tiller-deploy'): + if row[0].startswith("tiller-deploy"): already_initialized = True break - except Exception as e: + except Exception: pass # helm init n2vc_installed_sw = False if not already_initialized: - self.log.info('Initializing helm in client and server: {}'.format(cluster_uuid)) - command = '{} --kubeconfig={} --tiller-namespace={} --home={} init'\ - .format(self._helm_command, config_filename, namespace, helm_dir) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + self.log.info( + "Initializing helm in client and server: {}".format(cluster_uuid) + ) + command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format( + self._helm_command, config_filename, namespace, helm_dir + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) n2vc_installed_sw = True else: # check client helm installation - check_file = helm_dir + '/repository/repositories.yaml' - if not self._check_file_exists(filename=check_file, exception_if_not_exists=False): - self.log.info('Initializing helm in client: {}'.format(cluster_uuid)) - command = '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\ - .format(self._helm_command, config_filename, namespace, helm_dir) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + check_file = helm_dir + "/repository/repositories.yaml" + if not self._check_file_exists( + filename=check_file, exception_if_not_exists=False + ): + self.log.info("Initializing helm in client: {}".format(cluster_uuid)) + command = ( + "{} --kubeconfig={} --tiller-namespace={} " + "--home={} init --client-only" + ).format(self._helm_command, config_filename, namespace, helm_dir) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) else: - self.log.info('Helm client already initialized') + self.log.info("Helm client already initialized") - self.log.info('Cluster initialized {}'.format(cluster_uuid)) + self.log.info("Cluster initialized {}".format(cluster_uuid)) return cluster_uuid, n2vc_installed_sw async def repo_add( - self, - cluster_uuid: str, - name: str, - url: str, - repo_type: str = 'chart' + self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): - self.log.debug('adding {} repository {}. URL: {}'.format(repo_type, name, url)) + self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) # helm repo update - command = '{} --kubeconfig={} --home={} repo update'.format(self._helm_command, config_filename, helm_dir) - self.log.debug('updating repo: {}'.format(command)) + command = "{} --kubeconfig={} --home={} repo update".format( + self._helm_command, config_filename, helm_dir + ) + self.log.debug("updating repo: {}".format(command)) await self._local_async_exec(command=command, raise_exception_on_error=False) # helm repo add name url - command = '{} --kubeconfig={} --home={} repo add {} {}'\ - .format(self._helm_command, config_filename, helm_dir, name, url) - self.log.debug('adding repo: {}'.format(command)) + command = "{} --kubeconfig={} --home={} repo add {} {}".format( + self._helm_command, config_filename, helm_dir, name, url + ) + self.log.debug("adding repo: {}".format(command)) await self._local_async_exec(command=command, raise_exception_on_error=True) - async def repo_list( - self, - cluster_uuid: str - ) -> list: + async def repo_list(self, cluster_uuid: str) -> list: """ Get the list of registered repositories :return: list of registered repositories: [ (name, url) .... ] """ - self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} --kubeconfig={} --home={} repo list --output yaml'\ - .format(self._helm_command, config_filename, helm_dir) + command = "{} --kubeconfig={} --home={} repo list --output yaml".format( + self._helm_command, config_filename, helm_dir + ) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) if output and len(output) > 0: return yaml.load(output, Loader=yaml.SafeLoader) else: return [] - async def repo_remove( - self, - cluster_uuid: str, - name: str - ): + async def repo_remove(self, cluster_uuid: str, name: str): """ Remove a repository from OSM @@ -229,29 +242,31 @@ class K8sHelmConnector(K8sConnector): :return: True if successful """ - self.log.debug('list repositories for cluster {}'.format(cluster_uuid)) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} --kubeconfig={} --home={} repo remove {}'\ - .format(self._helm_command, config_filename, helm_dir, name) + command = "{} --kubeconfig={} --home={} repo remove {}".format( + self._helm_command, config_filename, helm_dir, name + ) await self._local_async_exec(command=command, raise_exception_on_error=True) async def reset( - self, - cluster_uuid: str, - force: bool = False, - uninstall_sw: bool = False + self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: - self.log.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid)) + self.log.debug( + "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid) + ) # get kube and helm directories - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=False) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=False + ) # uninstall releases if needed releases = await self.instances_list(cluster_uuid=cluster_uuid) @@ -259,107 +274,134 @@ class K8sHelmConnector(K8sConnector): if force: for r in releases: try: - kdu_instance = r.get('Name') - chart = r.get('Chart') - self.log.debug('Uninstalling {} -> {}'.format(chart, kdu_instance)) - await self.uninstall(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + kdu_instance = r.get("Name") + chart = r.get("Chart") + self.log.debug( + "Uninstalling {} -> {}".format(chart, kdu_instance) + ) + await self.uninstall( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) except Exception as e: - self.log.error('Error uninstalling release {}: {}'.format(kdu_instance, e)) + self.log.error( + "Error uninstalling release {}: {}".format(kdu_instance, e) + ) else: - msg = 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\ - .format(cluster_uuid) + msg = ( + "Cluster has releases and not force. Cannot reset K8s " + "environment. Cluster uuid: {}" + ).format(cluster_uuid) self.log.error(msg) raise K8sException(msg) if uninstall_sw: - self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) + self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid)) # find namespace for tiller pod - command = '{} --kubeconfig={} get deployments --all-namespaces'\ - .format(self.kubectl_command, config_filename) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + command = "{} --kubeconfig={} get deployments --all-namespaces".format( + self.kubectl_command, config_filename + ) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=False + ) output_table = K8sHelmConnector._output_to_table(output=output) namespace = None for r in output_table: try: - if 'tiller-deploy' in r[1]: + if "tiller-deploy" in r[1]: namespace = r[0] break - except Exception as e: + except Exception: pass else: - msg = 'Tiller deployment not found in cluster {}'.format(cluster_uuid) + msg = "Tiller deployment not found in cluster {}".format(cluster_uuid) self.log.error(msg) - self.log.debug('namespace for tiller: {}'.format(namespace)) + self.log.debug("namespace for tiller: {}".format(namespace)) - force_str = '--force' + force_str = "--force" if namespace: # delete tiller deployment - self.log.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid, namespace)) - command = '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\ - .format(self.kubectl_command, namespace, config_filename, force_str) - await self._local_async_exec(command=command, raise_exception_on_error=False) + self.log.debug( + "Deleting tiller deployment for cluster {}, namespace {}".format( + cluster_uuid, namespace + ) + ) + command = ( + "{} --namespace {} --kubeconfig={} {} delete deployment " + "tiller-deploy" + ).format(self.kubectl_command, namespace, config_filename, force_str) + await self._local_async_exec( + command=command, raise_exception_on_error=False + ) # uninstall tiller from cluster - self.log.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid)) - command = '{} --kubeconfig={} --home={} reset'\ - .format(self._helm_command, config_filename, helm_dir) - self.log.debug('resetting: {}'.format(command)) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + self.log.debug( + "Uninstalling tiller from cluster {}".format(cluster_uuid) + ) + command = "{} --kubeconfig={} --home={} reset".format( + self._helm_command, config_filename, helm_dir + ) + self.log.debug("resetting: {}".format(command)) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) else: - self.log.debug('namespace not found') + self.log.debug("namespace not found") # delete cluster directory - dir = self.fs.path + '/' + cluster_uuid - self.log.debug('Removing directory {}'.format(dir)) - shutil.rmtree(dir, ignore_errors=True) + direct = self.fs.path + "/" + cluster_uuid + self.log.debug("Removing directory {}".format(direct)) + shutil.rmtree(direct, ignore_errors=True) return True async def install( - self, - cluster_uuid: str, - kdu_model: str, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None, - kdu_name: str = None, - namespace: str = None + self, + cluster_uuid: str, + kdu_model: str, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, + kdu_name: str = None, + namespace: str = None, ): - self.log.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid)) + self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) # params to str # params_str = K8sHelmConnector._params_to_set_option(params) - params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) + params_str, file_to_delete = self._params_to_file_option( + cluster_uuid=cluster_uuid, params=params + ) - timeout_str = '' + timeout_str = "" if timeout: - timeout_str = '--timeout {}'.format(timeout) + timeout_str = "--timeout {}".format(timeout) # atomic - atomic_str = '' + atomic_str = "" if atomic: - atomic_str = '--atomic' + atomic_str = "--atomic" # namespace - namespace_str = '' + namespace_str = "" if namespace: namespace_str = "--namespace {}".format(namespace) # version - version_str = '' - if ':' in kdu_model: - parts = kdu_model.split(sep=':') + version_str = "" + if ":" in kdu_model: + parts = kdu_model.split(sep=":") if len(parts) == 2: - version_str = '--version {}'.format(parts[1]) + version_str = "--version {}".format(parts[1]) kdu_model = parts[0] # generate a name for the release. Then, check if already exists @@ -370,7 +412,7 @@ class K8sHelmConnector(K8sConnector): result = await self._status_kdu( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, - show_error_log=False + show_error_log=False, ) if result is not None: # instance already exists: generate a new one @@ -379,17 +421,29 @@ class K8sHelmConnector(K8sConnector): pass # helm repo install - command = '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \ - '--name={name} {ns} {model} {ver}'.format(helm=self._helm_command, atomic=atomic_str, - config=config_filename, dir=helm_dir, params=params_str, - timeout=timeout_str, name=kdu_instance, ns=namespace_str, - model=kdu_model, ver=version_str) - self.log.debug('installing: {}'.format(command)) + command = ( + "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} " + "{params} {timeout} --name={name} {ns} {model} {ver}".format( + helm=self._helm_command, + atomic=atomic_str, + config=config_filename, + dir=helm_dir, + params=params_str, + timeout=timeout_str, + name=kdu_instance, + ns=namespace_str, + model=kdu_model, + ver=version_str, + ) + ) + self.log.debug("installing: {}".format(command)) if atomic: # exec helm in a task exec_task = asyncio.ensure_future( - coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + coro_or_future=self._local_async_exec( + command=command, raise_exception_on_error=False + ) ) # write status in another task @@ -398,8 +452,8 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='install', - run_once=False + operation="install", + run_once=False, ) ) @@ -413,7 +467,9 @@ class K8sHelmConnector(K8sConnector): else: - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + output, rc = await self._local_async_exec( + command=command, raise_exception_on_error=False + ) # remove temporal values yaml file if file_to_delete: @@ -424,23 +480,20 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='install', + operation="install", run_once=True, - check_every=0 + check_every=0, ) if rc != 0: - msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + msg = "Error executing command: {}\nOutput: {}".format(command, output) self.log.error(msg) raise K8sException(msg) - self.log.debug('Returning kdu_instance {}'.format(kdu_instance)) + self.log.debug("Returning kdu_instance {}".format(kdu_instance)) return kdu_instance - async def instances_list( - self, - cluster_uuid: str - ) -> list: + async def instances_list(self, cluster_uuid: str) -> list: """ returns a list of deployed releases in a cluster @@ -448,71 +501,90 @@ class K8sHelmConnector(K8sConnector): :return: """ - self.log.debug('list releases for cluster {}'.format(cluster_uuid)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} --kubeconfig={} --home={} list --output yaml'\ - .format(self._helm_command, config_filename, helm_dir) + command = "{} --kubeconfig={} --home={} list --output yaml".format( + self._helm_command, config_filename, helm_dir + ) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) if output and len(output) > 0: - return yaml.load(output, Loader=yaml.SafeLoader).get('Releases') + return yaml.load(output, Loader=yaml.SafeLoader).get("Releases") else: return [] async def upgrade( - self, - cluster_uuid: str, - kdu_instance: str, - kdu_model: str = None, - atomic: bool = True, - timeout: float = 300, - params: dict = None, - db_dict: dict = None + self, + cluster_uuid: str, + kdu_instance: str, + kdu_model: str = None, + atomic: bool = True, + timeout: float = 300, + params: dict = None, + db_dict: dict = None, ): - self.log.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) # params to str # params_str = K8sHelmConnector._params_to_set_option(params) - params_str, file_to_delete = self._params_to_file_option(cluster_uuid=cluster_uuid, params=params) + params_str, file_to_delete = self._params_to_file_option( + cluster_uuid=cluster_uuid, params=params + ) - timeout_str = '' + timeout_str = "" if timeout: - timeout_str = '--timeout {}'.format(timeout) + timeout_str = "--timeout {}".format(timeout) # atomic - atomic_str = '' + atomic_str = "" if atomic: - atomic_str = '--atomic' + atomic_str = "--atomic" # version - version_str = '' - if kdu_model and ':' in kdu_model: - parts = kdu_model.split(sep=':') + version_str = "" + if kdu_model and ":" in kdu_model: + parts = kdu_model.split(sep=":") if len(parts) == 2: - version_str = '--version {}'.format(parts[1]) + version_str = "--version {}".format(parts[1]) kdu_model = parts[0] # helm repo upgrade - command = '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\ - .format(self._helm_command, atomic_str, config_filename, helm_dir, - params_str, timeout_str, kdu_instance, kdu_model, version_str) - self.log.debug('upgrading: {}'.format(command)) + command = ( + "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}" + ).format( + self._helm_command, + atomic_str, + config_filename, + helm_dir, + params_str, + timeout_str, + kdu_instance, + kdu_model, + version_str, + ) + self.log.debug("upgrading: {}".format(command)) if atomic: # exec helm in a task exec_task = asyncio.ensure_future( - coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + coro_or_future=self._local_async_exec( + command=command, raise_exception_on_error=False + ) ) # write status in another task status_task = asyncio.ensure_future( @@ -520,8 +592,8 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='upgrade', - run_once=False + operation="upgrade", + run_once=False, ) ) @@ -534,7 +606,9 @@ class K8sHelmConnector(K8sConnector): else: - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=False) + output, rc = await self._local_async_exec( + command=command, raise_exception_on_error=False + ) # remove temporal values yaml file if file_to_delete: @@ -545,46 +619,51 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='upgrade', + operation="upgrade", run_once=True, - check_every=0 + check_every=0, ) if rc != 0: - msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + msg = "Error executing command: {}\nOutput: {}".format(command, output) self.log.error(msg) raise K8sException(msg) # return new revision number - instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + instance = await self.get_instance_info( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) if instance: - revision = int(instance.get('Revision')) - self.log.debug('New revision: {}'.format(revision)) + revision = int(instance.get("Revision")) + self.log.debug("New revision: {}".format(revision)) return revision else: return 0 async def rollback( - self, - cluster_uuid: str, - kdu_instance: str, - revision=0, - db_dict: dict = None + self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - self.log.debug('rollback kdu_instance {} to revision {} from cluster {}' - .format(kdu_instance, revision, cluster_uuid)) + self.log.debug( + "rollback kdu_instance {} to revision {} from cluster {}".format( + kdu_instance, revision, cluster_uuid + ) + ) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} rollback --kubeconfig={} --home={} {} {} --wait'\ - .format(self._helm_command, config_filename, helm_dir, kdu_instance, revision) + command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format( + self._helm_command, config_filename, helm_dir, kdu_instance, revision + ) # exec helm in a task exec_task = asyncio.ensure_future( - coro_or_future=self._local_async_exec(command=command, raise_exception_on_error=False) + coro_or_future=self._local_async_exec( + command=command, raise_exception_on_error=False + ) ) # write status in another task status_task = asyncio.ensure_future( @@ -592,8 +671,8 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='rollback', - run_once=False + operation="rollback", + run_once=False, ) ) @@ -610,49 +689,56 @@ class K8sHelmConnector(K8sConnector): cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, db_dict=db_dict, - operation='rollback', + operation="rollback", run_once=True, - check_every=0 + check_every=0, ) if rc != 0: - msg = 'Error executing command: {}\nOutput: {}'.format(command, output) + msg = "Error executing command: {}\nOutput: {}".format(command, output) self.log.error(msg) raise K8sException(msg) # return new revision number - instance = await self.get_instance_info(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) + instance = await self.get_instance_info( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) if instance: - revision = int(instance.get('Revision')) - self.log.debug('New revision: {}'.format(revision)) + revision = int(instance.get("Revision")) + self.log.debug("New revision: {}".format(revision)) return revision else: return 0 - async def uninstall( - self, - cluster_uuid: str, - kdu_instance: str - ): + async def uninstall(self, cluster_uuid: str, kdu_instance: str): """ - Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen - after all _terminate-config-primitive_ of the VNF are invoked). + Removes an existing KDU instance. It would implicitly use the `delete` call + (this call would happen after all _terminate-config-primitive_ of the VNF + are invoked). :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance to be deleted :return: True if successful """ - self.log.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance, cluster_uuid)) + self.log.debug( + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) + ) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} --kubeconfig={} --home={} delete --purge {}'\ - .format(self._helm_command, config_filename, helm_dir, kdu_instance) + command = "{} --kubeconfig={} --home={} delete --purge {}".format( + self._helm_command, config_filename, helm_dir, kdu_instance + ) - output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True) + output, _rc = await self._local_async_exec( + command=command, raise_exception_on_error=True + ) return self._output_to_table(output) @@ -676,62 +762,70 @@ class K8sHelmConnector(K8sConnector): :return: Returns the output of the action """ - raise K8sException("KDUs deployed with Helm don't support actions " - "different from rollback, upgrade and status") + raise K8sException( + "KDUs deployed with Helm don't support actions " + "different from rollback, upgrade and status" + ) - async def inspect_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url)) + self.log.debug( + "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url) + ) - return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url) + return await self._exec_inspect_comand( + inspect_command="", kdu_model=kdu_model, repo_url=repo_url + ) - async def values_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url)) + self.log.debug( + "inspect kdu_model values {} from (optional) repo: {}".format( + kdu_model, repo_url + ) + ) - return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url) + return await self._exec_inspect_comand( + inspect_command="values", kdu_model=kdu_model, repo_url=repo_url + ) - async def help_kdu( - self, - kdu_model: str, - repo_url: str = None - ) -> str: + async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url)) + self.log.debug( + "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) + ) - return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url) + return await self._exec_inspect_comand( + inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url + ) - async def status_kdu( - self, - cluster_uuid: str, - kdu_instance: str - ) -> str: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str: # call internal function return await self._status_kdu( cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True, - return_text=True + return_text=True, ) async def synchronize_repos(self, cluster_uuid: str): self.log.debug("syncronize repos for cluster helm-id: {}",) try: - update_repos_timeout = 300 # max timeout to sync a single repos, more than this is too much - db_k8scluster = self.db.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid}) + update_repos_timeout = ( + 300 # max timeout to sync a single repos, more than this is too much + ) + db_k8scluster = self.db.get_one( + "k8sclusters", {"_admin.helm-chart.id": cluster_uuid} + ) if db_k8scluster: - nbi_repo_list = db_k8scluster.get("_admin").get("helm_chart_repos") or [] - cluster_repo_dict = db_k8scluster.get("_admin").get("helm_charts_added") or {} + nbi_repo_list = ( + db_k8scluster.get("_admin").get("helm_chart_repos") or [] + ) + cluster_repo_dict = ( + db_k8scluster.get("_admin").get("helm_charts_added") or {} + ) # elements that must be deleted deleted_repo_list = [] added_repo_dict = {} @@ -739,103 +833,144 @@ class K8sHelmConnector(K8sConnector): self.log.debug("helm_charts_added: {}".format(cluster_repo_dict)) # obtain repos to add: registered by nbi but not added - repos_to_add = [repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)] + repos_to_add = [ + repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo) + ] # obtain repos to delete: added by cluster but not in nbi list - repos_to_delete = [repo for repo in cluster_repo_dict.keys() if repo not in nbi_repo_list] - - # delete repos: must delete first then add because there may be different repos with same name but + repos_to_delete = [ + repo + for repo in cluster_repo_dict.keys() + if repo not in nbi_repo_list + ] + + # delete repos: must delete first then add because there may be + # different repos with same name but # different id and url self.log.debug("repos to delete: {}".format(repos_to_delete)) for repo_id in repos_to_delete: # try to delete repos try: - repo_delete_task = asyncio.ensure_future(self.repo_remove(cluster_uuid=cluster_uuid, - name=cluster_repo_dict[repo_id])) + repo_delete_task = asyncio.ensure_future( + self.repo_remove( + cluster_uuid=cluster_uuid, + name=cluster_repo_dict[repo_id], + ) + ) await asyncio.wait_for(repo_delete_task, update_repos_timeout) except Exception as e: - self.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id, - cluster_repo_dict[repo_id], str(e))) - # always add to the list of to_delete if there is an error because if is not there deleting raises error + self.warning( + "Error deleting repo, id: {}, name: {}, err_msg: {}".format( + repo_id, cluster_repo_dict[repo_id], str(e) + ) + ) + # always add to the list of to_delete if there is an error + # because if is not there + # deleting raises error deleted_repo_list.append(repo_id) # add repos self.log.debug("repos to add: {}".format(repos_to_add)) - add_task_list = [] for repo_id in repos_to_add: # obtain the repo data from the db - # if there is an error getting the repo in the database we will ignore this repo and continue - # because there is a possible race condition where the repo has been deleted while processing + # if there is an error getting the repo in the database we will + # ignore this repo and continue + # because there is a possible race condition where the repo has + # been deleted while processing db_repo = self.db.get_one("k8srepos", {"_id": repo_id}) - self.log.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id, db_repo["name"], db_repo["url"])) + self.log.debug( + "obtained repo: id, {}, name: {}, url: {}".format( + repo_id, db_repo["name"], db_repo["url"] + ) + ) try: - repo_add_task = asyncio.ensure_future(self.repo_add(cluster_uuid=cluster_uuid, - name=db_repo["name"], url=db_repo["url"], - repo_type="chart")) + repo_add_task = asyncio.ensure_future( + self.repo_add( + cluster_uuid=cluster_uuid, + name=db_repo["name"], + url=db_repo["url"], + repo_type="chart", + ) + ) await asyncio.wait_for(repo_add_task, update_repos_timeout) added_repo_dict[repo_id] = db_repo["name"] - self.log.debug("added repo: id, {}, name: {}".format(repo_id, db_repo["name"])) + self.log.debug( + "added repo: id, {}, name: {}".format( + repo_id, db_repo["name"] + ) + ) except Exception as e: - # deal with error adding repo, adding a repo that already exists does not raise any error - # will not raise error because a wrong repos added by anyone could prevent instantiating any ns - self.log.error("Error adding repo id: {}, err_msg: {} ".format(repo_id, repr(e))) + # deal with error adding repo, adding a repo that already + # exists does not raise any error + # will not raise error because a wrong repos added by + # anyone could prevent instantiating any ns + self.log.error( + "Error adding repo id: {}, err_msg: {} ".format( + repo_id, repr(e) + ) + ) return deleted_repo_list, added_repo_dict - else: # else db_k8scluster does not exist - raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid)) + else: # else db_k8scluster does not exist + raise K8sException( + "k8cluster with helm-id : {} not found".format(cluster_uuid) + ) except Exception as e: self.log.error("Error synchronizing repos: {}".format(str(e))) raise K8sException("Error synchronizing repos") """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ async def _exec_inspect_comand( - self, - inspect_command: str, - kdu_model: str, - repo_url: str = None + self, inspect_command: str, kdu_model: str, repo_url: str = None ): - repo_str = '' + repo_str = "" if repo_url: - repo_str = ' --repo {}'.format(repo_url) - idx = kdu_model.find('/') + repo_str = " --repo {}".format(repo_url) + idx = kdu_model.find("/") if idx >= 0: idx += 1 kdu_model = kdu_model[idx:] - inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str) - output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True) + inspect_command = "{} inspect {} {}{}".format( + self._helm_command, inspect_command, kdu_model, repo_str + ) + output, _rc = await self._local_async_exec( + command=inspect_command, encode_utf8=True + ) return output async def _status_kdu( - self, - cluster_uuid: str, - kdu_instance: str, - show_error_log: bool = False, - return_text: bool = False + self, + cluster_uuid: str, + kdu_instance: str, + show_error_log: bool = False, + return_text: bool = False, ): - self.log.debug('status of kdu_instance {}'.format(kdu_instance)) + self.log.debug("status of kdu_instance {}".format(kdu_instance)) # config filename - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths( + cluster_name=cluster_uuid, create_if_not_exist=True + ) - command = '{} --kubeconfig={} --home={} status {} --output yaml'\ - .format(self._helm_command, config_filename, helm_dir, kdu_instance) + command = "{} --kubeconfig={} --home={} status {} --output yaml".format( + self._helm_command, config_filename, helm_dir, kdu_instance + ) output, rc = await self._local_async_exec( command=command, raise_exception_on_error=True, - show_error_log=show_error_log + show_error_log=show_error_log, ) if return_text: @@ -848,111 +983,106 @@ class K8sHelmConnector(K8sConnector): # remove field 'notes' try: - del data.get('info').get('status')['notes'] + del data.get("info").get("status")["notes"] except KeyError: pass # parse field 'resources' try: - resources = str(data.get('info').get('status').get('resources')) + resources = str(data.get("info").get("status").get("resources")) resource_table = self._output_to_table(resources) - data.get('info').get('status')['resources'] = resource_table - except Exception as e: + data.get("info").get("status")["resources"] = resource_table + except Exception: pass return data - async def get_instance_info( - self, - cluster_uuid: str, - kdu_instance: str - ): + async def get_instance_info(self, cluster_uuid: str, kdu_instance: str): instances = await self.instances_list(cluster_uuid=cluster_uuid) for instance in instances: - if instance.get('Name') == kdu_instance: + if instance.get("Name") == kdu_instance: return instance - self.log.debug('Instance {} not found'.format(kdu_instance)) + self.log.debug("Instance {} not found".format(kdu_instance)) return None @staticmethod - def _generate_release_name( - chart_name: str - ): + def _generate_release_name(chart_name: str): # check embeded chart (file or dir) - if chart_name.startswith('/'): + if chart_name.startswith("/"): # extract file or directory name - chart_name = chart_name[chart_name.rfind('/')+1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] # check URL - elif '://' in chart_name: + elif "://" in chart_name: # extract last portion of URL - chart_name = chart_name[chart_name.rfind('/')+1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] - name = '' + name = "" for c in chart_name: if c.isalpha() or c.isnumeric(): name += c else: - name += '-' + name += "-" if len(name) > 35: name = name[0:35] # if does not start with alpha character, prefix 'a' if not name[0].isalpha(): - name = 'a' + name + name = "a" + name - name += '-' + name += "-" def get_random_number(): r = random.randrange(start=1, stop=99999999) s = str(r) - s = s.rjust(10, '0') + s = s.rjust(10, "0") return s name = name + get_random_number() return name.lower() async def _store_status( - self, - cluster_uuid: str, - operation: str, - kdu_instance: str, - check_every: float = 10, - db_dict: dict = None, - run_once: bool = False + self, + cluster_uuid: str, + operation: str, + kdu_instance: str, + check_every: float = 10, + db_dict: dict = None, + run_once: bool = False, ): while True: try: await asyncio.sleep(check_every) - detailed_status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance) - status = detailed_status.get('info').get('Description') - self.log.debug('STATUS:\n{}'.format(status)) - self.log.debug('DETAILED STATUS:\n{}'.format(detailed_status)) + detailed_status = await self.status_kdu( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance + ) + status = detailed_status.get("info").get("Description") + self.log.debug("STATUS:\n{}".format(status)) + self.log.debug("DETAILED STATUS:\n{}".format(detailed_status)) # write status to db result = await self.write_app_status_to_db( db_dict=db_dict, status=str(status), detailed_status=str(detailed_status), - operation=operation) + operation=operation, + ) if not result: - self.log.info('Error writing in database. Task exiting...') + self.log.info("Error writing in database. Task exiting...") return except asyncio.CancelledError: - self.log.debug('Task cancelled') + self.log.debug("Task cancelled") return except Exception as e: - self.log.debug('_store_status exception: {}'.format(str(e))) + self.log.debug("_store_status exception: {}".format(str(e))) pass finally: if run_once: return - async def _is_install_completed( - self, - cluster_uuid: str, - kdu_instance: str - ) -> bool: + async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool: - status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False) + status = await self._status_kdu( + cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False + ) # extract info.status.resources-> str # format: @@ -961,7 +1091,7 @@ class K8sHelmConnector(K8sConnector): # halting-horse-mongodb 0/1 1 0 0s # halting-petit-mongodb 1/1 1 0 0s # blank line - resources = K8sHelmConnector._get_deep(status, ('info', 'status', 'resources')) + resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources")) # convert to table resources = K8sHelmConnector._output_to_table(resources) @@ -973,26 +1103,26 @@ class K8sHelmConnector(K8sConnector): line1 = resources[index] index += 1 # find '==>' in column 0 - if line1[0] == '==>': + if line1[0] == "==>": line2 = resources[index] index += 1 # find READY in column 1 - if line2[1] == 'READY': + if line2[1] == "READY": # read next lines line3 = resources[index] index += 1 while len(line3) > 1 and index < num_lines: ready_value = line3[1] - parts = ready_value.split(sep='/') + parts = ready_value.split(sep="/") current = int(parts[0]) total = int(parts[1]) if current < total: - self.log.debug('NOT READY:\n {}'.format(line3)) + self.log.debug("NOT READY:\n {}".format(line3)) ready = False line3 = resources[index] index += 1 - except Exception as e: + except Exception: pass return ready @@ -1008,7 +1138,7 @@ class K8sHelmConnector(K8sConnector): return None else: target = value - except Exception as e: + except Exception: pass return value @@ -1017,11 +1147,11 @@ class K8sHelmConnector(K8sConnector): def _find_in_lines(p_lines: list, p_key: str) -> str: for line in p_lines: try: - if line.startswith(p_key + ':'): - parts = line.split(':') + if line.startswith(p_key + ":"): + parts = line.split(":") the_value = parts[1].strip() return the_value - except Exception as e: + except Exception: # ignore it pass return None @@ -1031,46 +1161,45 @@ class K8sHelmConnector(K8sConnector): def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str): if params and len(params) > 0: - kube_dir, helm_dir, config_filename, cluster_dir = \ - self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) + self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True) def get_random_number(): r = random.randrange(start=1, stop=99999999) s = str(r) while len(s) < 10: - s = '0' + s + s = "0" + s return s params2 = dict() for key in params: value = params.get(key) - if '!!yaml' in str(value): + if "!!yaml" in str(value): value = yaml.load(value[7:]) params2[key] = value - values_file = get_random_number() + '.yaml' - with open(values_file, 'w') as stream: + values_file = get_random_number() + ".yaml" + with open(values_file, "w") as stream: yaml.dump(params2, stream, indent=4, default_flow_style=False) - return '-f {}'.format(values_file), values_file + return "-f {}".format(values_file), values_file - return '', None + return "", None # params for use in --set option @staticmethod def _params_to_set_option(params: dict) -> str: - params_str = '' + params_str = "" if params and len(params) > 0: start = True for key in params: value = params.get(key, None) if value is not None: if start: - params_str += '--set ' + params_str += "--set " start = False else: - params_str += ',' - params_str += '{}={}'.format(key, value) + params_str += "," + params_str += "{}={}".format(key, value) return params_str @staticmethod @@ -1088,17 +1217,19 @@ class K8sHelmConnector(K8sConnector): output_table = list() lines = output.splitlines(keepends=False) for line in lines: - line = line.replace('\t', ' ') + line = line.replace("\t", " ") line_list = list() output_table.append(line_list) - cells = line.split(sep=' ') + cells = line.split(sep=" ") for cell in cells: cell = cell.strip() if len(cell) > 0: line_list.append(cell) return output_table - def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str): + def _get_paths( + self, cluster_name: str, create_if_not_exist: bool = False + ) -> (str, str, str, str): """ Returns kube and helm directories @@ -1113,81 +1244,78 @@ class K8sHelmConnector(K8sConnector): base = base[:-1] # base dir for cluster - cluster_dir = base + '/' + cluster_name + cluster_dir = base + "/" + cluster_name if create_if_not_exist and not os.path.exists(cluster_dir): - self.log.debug('Creating dir {}'.format(cluster_dir)) + self.log.debug("Creating dir {}".format(cluster_dir)) os.makedirs(cluster_dir) if not os.path.exists(cluster_dir): - msg = 'Base cluster dir {} does not exist'.format(cluster_dir) + msg = "Base cluster dir {} does not exist".format(cluster_dir) self.log.error(msg) raise K8sException(msg) # kube dir - kube_dir = cluster_dir + '/' + '.kube' + kube_dir = cluster_dir + "/" + ".kube" if create_if_not_exist and not os.path.exists(kube_dir): - self.log.debug('Creating dir {}'.format(kube_dir)) + self.log.debug("Creating dir {}".format(kube_dir)) os.makedirs(kube_dir) if not os.path.exists(kube_dir): - msg = 'Kube config dir {} does not exist'.format(kube_dir) + msg = "Kube config dir {} does not exist".format(kube_dir) self.log.error(msg) raise K8sException(msg) # helm home dir - helm_dir = cluster_dir + '/' + '.helm' + helm_dir = cluster_dir + "/" + ".helm" if create_if_not_exist and not os.path.exists(helm_dir): - self.log.debug('Creating dir {}'.format(helm_dir)) + self.log.debug("Creating dir {}".format(helm_dir)) os.makedirs(helm_dir) if not os.path.exists(helm_dir): - msg = 'Helm config dir {} does not exist'.format(helm_dir) + msg = "Helm config dir {} does not exist".format(helm_dir) self.log.error(msg) raise K8sException(msg) - config_filename = kube_dir + '/config' + config_filename = kube_dir + "/config" return kube_dir, helm_dir, config_filename, cluster_dir @staticmethod - def _remove_multiple_spaces(str): - str = str.strip() - while ' ' in str: - str = str.replace(' ', ' ') - return str - - def _local_exec( - self, - command: str - ) -> (str, int): + def _remove_multiple_spaces(strobj): + strobj = strobj.strip() + while " " in strobj: + strobj = strobj.replace(" ", " ") + return strobj + + def _local_exec(self, command: str) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.log.debug('Executing sync local command: {}'.format(command)) + self.log.debug("Executing sync local command: {}".format(command)) # raise exception if fails - output = '' + output = "" try: - output = subprocess.check_output(command, shell=True, universal_newlines=True) + output = subprocess.check_output( + command, shell=True, universal_newlines=True + ) return_code = 0 self.log.debug(output) - except Exception as e: + except Exception: return_code = 1 return output, return_code async def _local_async_exec( - self, - command: str, - raise_exception_on_error: bool = False, - show_error_log: bool = True, - encode_utf8: bool = False + self, + command: str, + raise_exception_on_error: bool = False, + show_error_log: bool = True, + encode_utf8: bool = False, ) -> (str, int): command = K8sHelmConnector._remove_multiple_spaces(command) - self.log.debug('Executing async local command: {}'.format(command)) + self.log.debug("Executing async local command: {}".format(command)) # split command - command = command.split(sep=' ') + command = command.split(sep=" ") try: process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # wait for command terminate @@ -1195,25 +1323,27 @@ class K8sHelmConnector(K8sConnector): return_code = process.returncode - output = '' + output = "" if stdout: - output = stdout.decode('utf-8').strip() + output = stdout.decode("utf-8").strip() # output = stdout.decode() if stderr: - output = stderr.decode('utf-8').strip() + output = stderr.decode("utf-8").strip() # output = stderr.decode() if return_code != 0 and show_error_log: - self.log.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output)) + self.log.debug( + "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output) + ) else: - self.log.debug('Return code: {}'.format(return_code)) + self.log.debug("Return code: {}".format(return_code)) if raise_exception_on_error and return_code != 0: raise K8sException(output) if encode_utf8: - output = output.encode('utf-8').strip() - output = str(output).replace('\\n', '\n') + output = output.encode("utf-8").strip() + output = str(output).replace("\\n", "\n") return output, return_code @@ -1222,21 +1352,19 @@ class K8sHelmConnector(K8sConnector): except K8sException: raise except Exception as e: - msg = 'Exception executing command: {} -> {}'.format(command, e) + msg = "Exception executing command: {} -> {}".format(command, e) self.log.error(msg) if raise_exception_on_error: raise K8sException(e) from e else: - return '', -1 + return "", -1 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False): # self.log.debug('Checking if file {} exists...'.format(filename)) if os.path.exists(filename): return True else: - msg = 'File {} does not exist'.format(filename) + msg = "File {} does not exist".format(filename) if exception_if_not_exists: # self.log.error(msg) raise K8sException(msg) - - diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py index e01fa0b..7a3bf27 100644 --- a/n2vc/k8s_juju_conn.py +++ b/n2vc/k8s_juju_conn.py @@ -14,37 +14,31 @@ import asyncio import concurrent -from .exceptions import NotImplemented +import os +import uuid -import io import juju -# from juju.bundle import BundleHandler from juju.controller import Controller -from juju.model import Model -from juju.errors import JujuAPIError, JujuError from n2vc.exceptions import K8sException - from n2vc.k8s_conn import K8sConnector +import yaml -import os +from .exceptions import MethodNotImplemented + + +# from juju.bundle import BundleHandler # import re # import ssl # from .vnf import N2VC - -import uuid -import yaml - - class K8sJujuConnector(K8sConnector): - def __init__( - self, - fs: object, - db: object, - kubectl_command: str = '/usr/bin/kubectl', - juju_command: str = '/usr/bin/juju', - log: object = None, - on_update_db=None, + self, + fs: object, + db: object, + kubectl_command: str = "/usr/bin/kubectl", + juju_command: str = "/usr/bin/juju", + log: object = None, + on_update_db=None, ): """ @@ -56,14 +50,11 @@ class K8sJujuConnector(K8sConnector): # parent class K8sConnector.__init__( - self, - db, - log=log, - on_update_db=on_update_db, + self, db, log=log, on_update_db=on_update_db, ) self.fs = fs - self.log.debug('Initializing K8S Juju connector') + self.log.debug("Initializing K8S Juju connector") self.authenticated = False self.models = {} @@ -71,23 +62,27 @@ class K8sJujuConnector(K8sConnector): self.juju_command = juju_command self.juju_secret = "" - self.log.debug('K8S Juju connector initialized') + self.log.debug("K8S Juju connector initialized") """Initialization""" + async def init_env( self, k8s_creds: str, - namespace: str = 'kube-system', + namespace: str = "kube-system", reuse_cluster_uuid: str = None, ) -> (str, bool): """ It prepares a given K8s cluster environment to run Juju bundles. - :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config' - :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used + :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid + '.kube/config' + :param namespace: optional namespace to be used for juju. By default, + 'kube-system' will be used :param reuse_cluster_uuid: existing cluster uuid for reuse - :return: uuid of the K8s cluster and True if connector has installed some software in the cluster - (on error, an exception will be raised) + :return: uuid of the K8s cluster and True if connector has installed some + software in the cluster + (on error, an exception will be raised) """ """Bootstrapping @@ -155,38 +150,34 @@ class K8sJujuConnector(K8sConnector): # Parse ~/.local/share/juju/controllers.yaml # controllers.testing.api-endpoints|ca-cert|uuid self.log.debug("Getting controller endpoints") - with open(os.path.expanduser( - "~/.local/share/juju/controllers.yaml" - )) as f: + with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f: controllers = yaml.load(f, Loader=yaml.Loader) - controller = controllers['controllers'][cluster_uuid] - endpoints = controller['api-endpoints'] + controller = controllers["controllers"][cluster_uuid] + endpoints = controller["api-endpoints"] self.juju_endpoint = endpoints[0] - self.juju_ca_cert = controller['ca-cert'] + self.juju_ca_cert = controller["ca-cert"] # Parse ~/.local/share/juju/accounts # controllers.testing.user|password self.log.debug("Getting accounts") - with open(os.path.expanduser( - "~/.local/share/juju/accounts.yaml" - )) as f: + with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f: controllers = yaml.load(f, Loader=yaml.Loader) - controller = controllers['controllers'][cluster_uuid] + controller = controllers["controllers"][cluster_uuid] - self.juju_user = controller['user'] - self.juju_secret = controller['password'] + self.juju_user = controller["user"] + self.juju_secret = controller["password"] # raise Exception("EOL") self.juju_public_key = None config = { - 'endpoint': self.juju_endpoint, - 'username': self.juju_user, - 'secret': self.juju_secret, - 'cacert': self.juju_ca_cert, - 'namespace': namespace, - 'loadbalancer': loadbalancer, + "endpoint": self.juju_endpoint, + "username": self.juju_user, + "secret": self.juju_secret, + "cacert": self.juju_ca_cert, + "namespace": namespace, + "loadbalancer": loadbalancer, } # Store the cluster configuration so it @@ -200,10 +191,10 @@ class K8sJujuConnector(K8sConnector): config = self.get_config(cluster_uuid) - self.juju_endpoint = config['endpoint'] - self.juju_user = config['username'] - self.juju_secret = config['secret'] - self.juju_ca_cert = config['cacert'] + self.juju_endpoint = config["endpoint"] + self.juju_user = config["username"] + self.juju_secret = config["secret"] + self.juju_ca_cert = config["cacert"] self.juju_public_key = None # Login to the k8s cluster @@ -211,52 +202,44 @@ class K8sJujuConnector(K8sConnector): await self.login(cluster_uuid) # We're creating a new cluster - #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)) - #model = await self.get_model( + # print("Getting model {}".format(self.get_namespace(cluster_uuid), + # cluster_uuid=cluster_uuid)) + # model = await self.get_model( # self.get_namespace(cluster_uuid), # cluster_uuid=cluster_uuid - #) + # ) - ## Disconnect from the model - #if model and model.is_connected(): + # Disconnect from the model + # if model and model.is_connected(): # await model.disconnect() return cluster_uuid, True """Repo Management""" + async def repo_add( - self, - name: str, - url: str, - type: str = "charm", + self, name: str, url: str, _type: str = "charm", ): - raise NotImplemented() + raise MethodNotImplemented() async def repo_list(self): - raise NotImplemented() + raise MethodNotImplemented() async def repo_remove( - self, - name: str, + self, name: str, ): - raise NotImplemented() + raise MethodNotImplemented() - async def synchronize_repos( - self, - cluster_uuid: str, - name: str - ): + async def synchronize_repos(self, cluster_uuid: str, name: str): """ Returns None as currently add_repo is not implemented """ return None """Reset""" + async def reset( - self, - cluster_uuid: str, - force: bool = False, - uninstall_sw: bool = False + self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False ) -> bool: """Reset a cluster @@ -275,10 +258,7 @@ class K8sJujuConnector(K8sConnector): namespace = self.get_namespace(cluster_uuid) if await self.has_model(namespace): self.log.debug("[reset] Destroying model") - await self.controller.destroy_model( - namespace, - destroy_storage=True - ) + await self.controller.destroy_model(namespace, destroy_storage=True) # Disconnect from the controller self.log.debug("[reset] Disconnecting controller") @@ -308,7 +288,7 @@ class K8sJujuConnector(K8sConnector): params: dict = None, db_dict: dict = None, kdu_name: str = None, - namespace: str = None + namespace: str = None, ) -> bool: """Install a bundle @@ -350,7 +330,8 @@ class K8sJujuConnector(K8sConnector): "Juju bundle that models the KDU, in any of the following ways: - / - - - + - - """ @@ -389,17 +370,17 @@ class K8sJujuConnector(K8sConnector): self.log.debug("Waiting for all units to be active...") await model.block_until( lambda: all( - unit.agent_status == 'idle' - and application.status in ['active', 'unknown'] - and unit.workload_status in [ - 'active', 'unknown' - ] for unit in application.units + unit.agent_status == "idle" + and application.status in ["active", "unknown"] + and unit.workload_status in ["active", "unknown"] + for unit in application.units ), - timeout=timeout + timeout=timeout, ) self.log.debug("All units active.") - except concurrent.futures._base.TimeoutError: # TODO use asyncio.TimeoutError + # TODO use asyncio.TimeoutError + except concurrent.futures._base.TimeoutError: os.chdir(previous_workdir) self.log.debug("[install] Timeout exceeded; resetting cluster") await self.reset(cluster_uuid) @@ -415,10 +396,7 @@ class K8sJujuConnector(K8sConnector): return kdu_instance raise Exception("Unable to install") - async def instances_list( - self, - cluster_uuid: str - ) -> list: + async def instances_list(self, cluster_uuid: str) -> list: """ returns a list of deployed releases in a cluster @@ -461,7 +439,7 @@ class K8sJujuConnector(K8sConnector): namespace = self.get_namespace(cluster_uuid) model = await self.get_model(namespace, cluster_uuid=cluster_uuid) - with open(kdu_model, 'r') as f: + with open(kdu_model, "r") as f: bundle = yaml.safe_load(f) """ @@ -483,31 +461,29 @@ class K8sJujuConnector(K8sConnector): } """ # TODO: This should be returned in an agreed-upon format - for name in bundle['applications']: + for name in bundle["applications"]: self.log.debug(model.applications) application = model.applications[name] self.log.debug(application) - path = bundle['applications'][name]['charm'] + path = bundle["applications"][name]["charm"] try: await application.upgrade_charm(switch=path) except juju.errors.JujuError as ex: - if 'already running charm' in str(ex): + if "already running charm" in str(ex): # We're already running this version pass await model.disconnect() return True - raise NotImplemented() + raise MethodNotImplemented() """Rollback""" + async def rollback( - self, - cluster_uuid: str, - kdu_instance: str, - revision: int = 0, + self, cluster_uuid: str, kdu_instance: str, revision: int = 0, ) -> str: """Rollback a model @@ -519,14 +495,11 @@ class K8sJujuConnector(K8sConnector): :return: If successful, returns the revision of active KDU instance, or raises an exception """ - raise NotImplemented() + raise MethodNotImplemented() """Deletion""" - async def uninstall( - self, - cluster_uuid: str, - kdu_instance: str - ) -> bool: + + async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool: """Uninstall a KDU instance :param cluster_uuid str: The UUID of the cluster @@ -572,11 +545,15 @@ class K8sJujuConnector(K8sConnector): await self.login(cluster_uuid) if not params or "application-name" not in params: - raise K8sException("Missing application-name argument, \ - argument needed for K8s actions") + raise K8sException( + "Missing application-name argument, \ + argument needed for K8s actions" + ) try: - self.log.debug("[exec_primitive] Getting model " - "kdu_instance: {}".format(kdu_instance)) + self.log.debug( + "[exec_primitive] Getting model " + "kdu_instance: {}".format(kdu_instance) + ) model = await self.get_model(kdu_instance, cluster_uuid) @@ -607,7 +584,9 @@ class K8sJujuConnector(K8sConnector): ) if status != "completed": - raise K8sException("status is not completed: {} output: {}".format(status, output)) + raise K8sException( + "status is not completed: {} output: {}".format(status, output) + ) return output @@ -617,10 +596,8 @@ class K8sJujuConnector(K8sConnector): raise K8sException(message=error_msg) """Introspection""" - async def inspect_kdu( - self, - kdu_model: str, - ) -> dict: + + async def inspect_kdu(self, kdu_model: str,) -> dict: """Inspect a KDU Inspects a bundle and returns a dictionary of config parameters and @@ -633,7 +610,7 @@ class K8sJujuConnector(K8sConnector): """ kdu = {} - with open(kdu_model, 'r') as f: + with open(kdu_model, "r") as f: bundle = yaml.safe_load(f) """ @@ -655,14 +632,11 @@ class K8sJujuConnector(K8sConnector): } """ # TODO: This should be returned in an agreed-upon format - kdu = bundle['applications'] + kdu = bundle["applications"] return kdu - async def help_kdu( - self, - kdu_model: str, - ) -> str: + async def help_kdu(self, kdu_model: str,) -> str: """View the README If available, returns the README of the bundle. @@ -673,21 +647,17 @@ class K8sJujuConnector(K8sConnector): """ readme = None - files = ['README', 'README.txt', 'README.md'] + files = ["README", "README.txt", "README.md"] path = os.path.dirname(kdu_model) for file in os.listdir(path): if file in files: - with open(file, 'r') as f: + with open(file, "r") as f: readme = f.read() break return readme - async def status_kdu( - self, - cluster_uuid: str, - kdu_instance: str, - ) -> dict: + async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict: """Get the status of the KDU Get the current status of the KDU instance. @@ -700,7 +670,9 @@ class K8sJujuConnector(K8sConnector): """ status = {} - model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid) + model = await self.get_model( + self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid + ) # model = await self.get_model_by_uuid(cluster_uuid) if model: @@ -709,9 +681,7 @@ class K8sJujuConnector(K8sConnector): for name in model_status.applications: application = model_status.applications[name] - status[name] = { - 'status': application['status']['status'] - } + status[name] = {"status": application["status"]["status"]} if model.is_connected(): await model.disconnect() @@ -719,11 +689,7 @@ class K8sJujuConnector(K8sConnector): return status # Private methods - async def add_k8s( - self, - cloud_name: str, - credentials: str, - ) -> bool: + async def add_k8s(self, cloud_name: str, credentials: str,) -> bool: """Add a k8s cloud to Juju Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a @@ -751,7 +717,7 @@ class K8sJujuConnector(K8sConnector): await process.stdin.drain() process.stdin.close() - stdout, stderr = await process.communicate() + _stdout, stderr = await process.communicate() return_code = process.returncode @@ -762,11 +728,7 @@ class K8sJujuConnector(K8sConnector): return True - async def add_model( - self, - model_name: str, - cluster_uuid: str, - ) -> juju.model.Model: + async def add_model(self, model_name: str, cluster_uuid: str,) -> juju.model.Model: """Adds a model to the controller Adds a new model to the Juju controller @@ -778,11 +740,12 @@ class K8sJujuConnector(K8sConnector): if not self.authenticated: await self.login(cluster_uuid) - self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)) + self.log.debug( + "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid) + ) try: model = await self.controller.add_model( - model_name, - config={'authorized-keys': self.juju_public_key} + model_name, config={"authorized-keys": self.juju_public_key} ) except Exception as ex: self.log.debug(ex) @@ -792,10 +755,7 @@ class K8sJujuConnector(K8sConnector): return model async def bootstrap( - self, - cloud_name: str, - cluster_uuid: str, - loadbalancer: bool + self, cloud_name: str, cluster_uuid: str, loadbalancer: bool ) -> bool: """Bootstrap a Kubernetes controller @@ -811,35 +771,38 @@ class K8sJujuConnector(K8sConnector): cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid] else: """ - For public clusters, specify that the controller service is using a LoadBalancer. + For public clusters, specify that the controller service is using a + LoadBalancer. """ - cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"] - - self.log.debug("Bootstrapping controller {} in cloud {}".format( - cluster_uuid, cloud_name - )) + cmd = [ + self.juju_command, + "bootstrap", + cloud_name, + cluster_uuid, + "--config", + "controller-service-type=loadbalancer", + ] + + self.log.debug( + "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name) + ) process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await process.communicate() + _stdout, stderr = await process.communicate() return_code = process.returncode if return_code > 0: # - if b'already exists' not in stderr: + if b"already exists" not in stderr: raise Exception(stderr) return True - async def destroy_controller( - self, - cluster_uuid: str - ) -> bool: + async def destroy_controller(self, cluster_uuid: str) -> bool: """Destroy a Kubernetes controller Destroy an existing Kubernetes controller. @@ -853,28 +816,23 @@ class K8sJujuConnector(K8sConnector): "--destroy-all-models", "--destroy-storage", "-y", - cluster_uuid + cluster_uuid, ] process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await process.communicate() + _stdout, stderr = await process.communicate() return_code = process.returncode if return_code > 0: # - if 'already exists' not in stderr: + if "already exists" not in stderr: raise Exception(stderr) - def get_config( - self, - cluster_uuid: str, - ) -> dict: + def get_config(self, cluster_uuid: str,) -> dict: """Get the cluster configuration Gets the configuration of the cluster @@ -884,21 +842,15 @@ class K8sJujuConnector(K8sConnector): """ cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid) if os.path.exists(cluster_config): - with open(cluster_config, 'r') as f: + with open(cluster_config, "r") as f: config = yaml.safe_load(f.read()) return config else: raise Exception( - "Unable to locate configuration for cluster {}".format( - cluster_uuid - ) + "Unable to locate configuration for cluster {}".format(cluster_uuid) ) - async def get_model( - self, - model_name: str, - cluster_uuid: str, - ) -> juju.model.Model: + async def get_model(self, model_name: str, cluster_uuid: str,) -> juju.model.Model: """Get a model from the Juju Controller. Note: Model objects returned must call disconnected() before it goes @@ -914,15 +866,10 @@ class K8sJujuConnector(K8sConnector): models = await self.controller.list_models() if model_name in models: self.log.debug("Found model: {}".format(model_name)) - model = await self.controller.get_model( - model_name - ) + model = await self.controller.get_model(model_name) return model - def get_namespace( - self, - cluster_uuid: str, - ) -> str: + def get_namespace(self, cluster_uuid: str,) -> str: """Get the namespace UUID Gets the namespace's unique name @@ -932,18 +879,15 @@ class K8sJujuConnector(K8sConnector): config = self.get_config(cluster_uuid) # Make sure the name is in the config - if 'namespace' not in config: + if "namespace" not in config: raise Exception("Namespace not found.") # TODO: We want to make sure this is unique to the cluster, in case # the cluster is being reused. # Consider pre/appending the cluster id to the namespace string - return config['namespace'] + return config["namespace"] - async def has_model( - self, - model_name: str - ) -> bool: + async def has_model(self, model_name: str) -> bool: """Check if a model exists in the controller Checks to see if a model exists in the connected Juju controller. @@ -957,10 +901,7 @@ class K8sJujuConnector(K8sConnector): return True return False - def is_local_k8s( - self, - credentials: str, - ) -> bool: + def is_local_k8s(self, credentials: str,) -> bool: """Check if a cluster is local Checks if a cluster is running in the local host @@ -973,9 +914,9 @@ class K8sJujuConnector(K8sConnector): host_ip = os.getenv("OSMLCM_VCA_APIPROXY") if creds and host_ip: - for cluster in creds['clusters']: - if 'server' in cluster['cluster']: - if host_ip in cluster['cluster']['server']: + for cluster in creds["clusters"]: + if "server" in cluster["cluster"]: + if host_ip in cluster["cluster"]["server"]: return True return False @@ -991,10 +932,10 @@ class K8sJujuConnector(K8sConnector): # Test: Make sure we have the credentials loaded config = self.get_config(cluster_uuid) - self.juju_endpoint = config['endpoint'] - self.juju_user = config['username'] - self.juju_secret = config['secret'] - self.juju_ca_cert = config['cacert'] + self.juju_endpoint = config["endpoint"] + self.juju_user = config["username"] + self.juju_secret = config["secret"] + self.juju_ca_cert = config["cacert"] self.juju_public_key = None self.controller = Controller() @@ -1002,9 +943,7 @@ class K8sJujuConnector(K8sConnector): if self.juju_secret: self.log.debug( "Connecting to controller... ws://{} as {}/{}".format( - self.juju_endpoint, - self.juju_user, - self.juju_secret, + self.juju_endpoint, self.juju_user, self.juju_secret, ) ) try: @@ -1035,18 +974,13 @@ class K8sJujuConnector(K8sConnector): await self.models[model].disconnect() if self.controller: - self.log.debug("Disconnecting controller {}".format( - self.controller - )) + self.log.debug("Disconnecting controller {}".format(self.controller)) await self.controller.disconnect() self.controller = None self.authenticated = False - async def remove_cloud( - self, - cloud_name: str, - ) -> bool: + async def remove_cloud(self, cloud_name: str,) -> bool: """Remove a k8s cloud from Juju Removes a Kubernetes cloud from Juju. @@ -1059,12 +993,10 @@ class K8sJujuConnector(K8sConnector): # Remove the bootstrapped controller cmd = [self.juju_command, "remove-k8s", "--client", cloud_name] process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await process.communicate() + _stdout, stderr = await process.communicate() return_code = process.returncode @@ -1074,12 +1006,10 @@ class K8sJujuConnector(K8sConnector): # Remove the cloud from the local config cmd = [self.juju_command, "remove-cloud", "--client", cloud_name] process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await process.communicate() + _stdout, stderr = await process.communicate() return_code = process.returncode @@ -1088,11 +1018,7 @@ class K8sJujuConnector(K8sConnector): return True - async def set_config( - self, - cluster_uuid: str, - config: dict, - ) -> bool: + async def set_config(self, cluster_uuid: str, config: dict,) -> bool: """Save the cluster configuration Saves the cluster information to the file store @@ -1105,7 +1031,7 @@ class K8sJujuConnector(K8sConnector): cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid) if not os.path.exists(cluster_config): self.log.debug("Writing config to {}".format(cluster_config)) - with open(cluster_config, 'w') as f: + with open(cluster_config, "w") as f: f.write(yaml.dump(config, Dumper=yaml.Dumper)) return True diff --git a/n2vc/loggable.py b/n2vc/loggable.py index 87a645d..d588a1d 100644 --- a/n2vc/loggable.py +++ b/n2vc/loggable.py @@ -21,24 +21,18 @@ ## -import logging import asyncio -import time -import inspect import datetime -import threading # only for logging purposes (not for using threads) +import inspect +import logging +import threading # only for logging purposes (not for using threads) +import time class Loggable: + def __init__(self, log, log_to_console: bool = False, prefix: str = ""): - def __init__( - self, - log, - log_to_console: bool = False, - prefix: str = '' - ): - - self._last_log_time = None # used for time increment in logging + self._last_log_time = None # used for time increment in logging self._log_to_console = log_to_console self._prefix = prefix if log is not None: @@ -47,21 +41,21 @@ class Loggable: self.log = logging.getLogger(__name__) def debug(self, msg: str): - self._log_msg(log_level='DEBUG', msg=msg) + self._log_msg(log_level="DEBUG", msg=msg) def info(self, msg: str): - self._log_msg(log_level='INFO', msg=msg) + self._log_msg(log_level="INFO", msg=msg) def warning(self, msg: str): - self._log_msg(log_level='WARNING', msg=msg) + self._log_msg(log_level="WARNING", msg=msg) def error(self, msg: str): - self._log_msg(log_level='ERROR', msg=msg) + self._log_msg(log_level="ERROR", msg=msg) def critical(self, msg: str): - self._log_msg(log_level='CRITICAL', msg=msg) + self._log_msg(log_level="CRITICAL", msg=msg) - ################################################################################################## + #################################################################################### def _log_msg(self, log_level: str, msg: str): """Generic log method""" @@ -72,41 +66,41 @@ class Loggable: level=3, include_path=False, include_thread=False, - include_coroutine=True + include_coroutine=True, ) if self._log_to_console: print(msg) else: if self.log is not None: - if log_level == 'DEBUG': + if log_level == "DEBUG": self.log.debug(msg) - elif log_level == 'INFO': + elif log_level == "INFO": self.log.info(msg) - elif log_level == 'WARNING': + elif log_level == "WARNING": self.log.warning(msg) - elif log_level == 'ERROR': + elif log_level == "ERROR": self.log.error(msg) - elif log_level == 'CRITICAL': + elif log_level == "CRITICAL": self.log.critical(msg) def _format_log( - self, - log_level: str, - msg: str = '', - obj: object = None, - level: int = None, - include_path: bool = False, - include_thread: bool = False, - include_coroutine: bool = True + self, + log_level: str, + msg: str = "", + obj: object = None, + level: int = None, + include_path: bool = False, + include_thread: bool = False, + include_coroutine: bool = True, ) -> str: # time increment from last log now = time.perf_counter() if self._last_log_time is None: - time_str = ' (+0.000)' + time_str = " (+0.000)" else: diff = round(now - self._last_log_time, 3) - time_str = ' (+{})'.format(diff) + time_str = " (+{})".format(diff) self._last_log_time = now if level is None: @@ -119,49 +113,69 @@ class Loggable: lineno = fi.lineno # filename without path if not include_path: - i = filename.rfind('/') + i = filename.rfind("/") if i > 0: - filename = filename[i+1:] + filename = filename[i + 1 :] # datetime - dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') + dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") dt = dt + time_str # dt = time_str # logger already shows datetime # current thread if include_thread: - thread_name = 'th:{}'.format(threading.current_thread().getName()) + thread_name = "th:{}".format(threading.current_thread().getName()) else: - thread_name = '' + thread_name = "" # current coroutine - coroutine_id = '' + coroutine_id = "" if include_coroutine: try: if asyncio.Task.current_task() is not None: + def print_cor_name(c): import inspect + try: for m in inspect.getmembers(c): - if m[0] == '__name__': + if m[0] == "__name__": return m[1] except Exception: pass + coro = asyncio.Task.current_task()._coro - coroutine_id = 'coro-{} {}()'.format(hex(id(coro))[2:], print_cor_name(coro)) + coroutine_id = "coro-{} {}()".format( + hex(id(coro))[2:], print_cor_name(coro) + ) except Exception: - coroutine_id = '' + coroutine_id = "" # classname if obj is not None: obj_type = obj.__class__.__name__ # type: str - log_msg = \ - '{} {} {} {} {}::{}.{}():{}\n{}'\ - .format(self._prefix, dt, thread_name, coroutine_id, filename, obj_type, func, lineno, str(msg)) + log_msg = "{} {} {} {} {}::{}.{}():{}\n{}".format( + self._prefix, + dt, + thread_name, + coroutine_id, + filename, + obj_type, + func, + lineno, + str(msg), + ) else: - log_msg = \ - '{} {} {} {} {}::{}():{}\n{}'\ - .format(self._prefix, dt, thread_name, coroutine_id, filename, func, lineno, str(msg)) + log_msg = "{} {} {} {} {}::{}():{}\n{}".format( + self._prefix, + dt, + thread_name, + coroutine_id, + filename, + func, + lineno, + str(msg), + ) return log_msg diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index 6819335..c0bb558 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -23,25 +23,26 @@ import abc import asyncio +from enum import Enum +from http import HTTPStatus import os -import subprocess import shlex +import subprocess import time -from enum import Enum -from http import HTTPStatus -from n2vc.loggable import Loggable + from n2vc.exceptions import N2VCBadArgumentsException +from osm_common.dbmongo import DbException import yaml -from osm_common.dbmongo import DbException +from n2vc.loggable import Loggable class N2VCDeploymentStatus(Enum): - PENDING = 'pending' - RUNNING = 'running' - COMPLETED = 'completed' - FAILED = 'failed' - UNKNOWN = 'unknown' + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + UNKNOWN = "unknown" class N2VCConnector(abc.ABC, Loggable): @@ -51,35 +52,39 @@ class N2VCConnector(abc.ABC, Loggable): """ """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ def __init__( - self, - db: object, - fs: object, - log: object, - loop: object, - url: str, - username: str, - vca_config: dict, - on_update_db=None + self, + db: object, + fs: object, + log: object, + loop: object, + url: str, + username: str, + vca_config: dict, + on_update_db=None, ): """Initialize N2VC abstract connector. It defines de API for VCA connectors :param object db: Mongo object managing the MongoDB (repo common DbBase) - :param object fs: FileSystem object managing the package artifacts (repo common FsBase) + :param object fs: FileSystem object managing the package artifacts (repo common + FsBase) :param object log: the logging object to log to :param object loop: the loop to use for asyncio (default current thread loop) - :param str url: a string that how to connect to the VCA (if needed, IP and port can be obtained from there) + :param str url: a string that how to connect to the VCA (if needed, IP and port + can be obtained from there) :param str username: the username to authenticate with VCA - :param dict vca_config: Additional parameters for the specific VCA. For example, for juju it will contain: + :param dict vca_config: Additional parameters for the specific VCA. For example, + for juju it will contain: secret: The password to authenticate with public_key: The contents of the juju public SSH key ca_cert str: The CA certificate used to authenticate - :param on_update_db: callback called when n2vc connector updates database. Received arguments: + :param on_update_db: callback called when n2vc connector updates database. + Received arguments: table: e.g. "nsrs" filter: e.g. {_id: } path: e.g. "_admin.deployed.VCA.3." @@ -87,17 +92,26 @@ class N2VCConnector(abc.ABC, Loggable): """ # parent class - Loggable.__init__(self, log=log, log_to_console=True, prefix='\nN2VC') + Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC") # check arguments if db is None: - raise N2VCBadArgumentsException('Argument db is mandatory', ['db']) + raise N2VCBadArgumentsException("Argument db is mandatory", ["db"]) if fs is None: - raise N2VCBadArgumentsException('Argument fs is mandatory', ['fs']) - - self.log.info('url={}, username={}, vca_config={}'.format( - url, username, {k: v for k, v in vca_config.items() if k not in ("host", "port", "user", "secret", - "public_key", "ca_cert")})) + raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"]) + + self.log.info( + "url={}, username={}, vca_config={}".format( + url, + username, + { + k: v + for k, v in vca_config.items() + if k + not in ("host", "port", "user", "secret", "public_key", "ca_cert") + }, + ) + ) # store arguments into self self.db = db @@ -125,21 +139,19 @@ class N2VCConnector(abc.ABC, Loggable): def get_public_key(self) -> str: """Get the VCA ssh-public-key - Returns the SSH public key from local mahine, to be injected into virtual machines to - be managed by the VCA. + Returns the SSH public key from local mahine, to be injected into virtual + machines to be managed by the VCA. First run, a ssh keypair will be created. The public key is injected into a VM so that we can provision the - machine with Juju, after which Juju will communicate with the VM + machine with Juju, after which Juju will communicate with the VM directly via the juju agent. """ - public_key = '' - # Find the path where we expect our key lives (~/.ssh) - homedir = os.environ.get('HOME') + homedir = os.environ.get("HOME") if not homedir: - self.warning('No HOME environment variable, using /tmp') - homedir = '/tmp' + self.warning("No HOME environment variable, using /tmp") + homedir = "/tmp" sshdir = "{}/.ssh".format(homedir) if not os.path.exists(sshdir): os.mkdir(sshdir) @@ -150,9 +162,7 @@ class N2VCConnector(abc.ABC, Loggable): # If we don't have a key generated, then we have to generate it using ssh-keygen if not os.path.exists(self.private_key_path): cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( - "rsa", - "4096", - self.private_key_path + "rsa", "4096", self.private_key_path ) # run command with arguments subprocess.check_output(shlex.split(cmd)) @@ -170,20 +180,24 @@ class N2VCConnector(abc.ABC, Loggable): db_dict: dict, reuse_ee_id: str = None, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> (str, dict): - """Create an Execution Environment. Returns when it is created or raises an exception on failing + """Create an Execution Environment. Returns when it is created or raises an + exception on failing :param str namespace: Contains a dot separate string. LCM will use: []...[-] :param dict db_dict: where to write to database when the status changes. It contains a dictionary with {collection: str, filter: {}, path: str}, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} - :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an older environment + e.g. {collection: "nsrs", filter: {_id: , path: + "_admin.deployed.VCA.3"} + :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an + older environment :param float progress_timeout: :param float total_timeout: :returns str, dict: id of the new execution environment and credentials for it - (credentials can contains hostname, username, etc depending on underlying cloud) + (credentials can contains hostname, username, etc depending on + underlying cloud) """ @abc.abstractmethod @@ -193,17 +207,20 @@ class N2VCConnector(abc.ABC, Loggable): credentials: dict, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ Register an existing execution environment at the VCA :param str namespace: same as create_execution_environment method - :param dict credentials: credentials to access the existing execution environment - (it can contains hostname, username, path to private key, etc depending on underlying cloud) + :param dict credentials: credentials to access the existing execution + environment + (it can contains hostname, username, path to private key, etc depending on + underlying cloud) :param dict db_dict: where to write to database when the status changes. It contains a dictionary with {collection: str, filter: {}, path: str}, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns str: id of the execution environment @@ -216,19 +233,22 @@ class N2VCConnector(abc.ABC, Loggable): artifact_path: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ): """ Install the software inside the execution environment identified by ee_id - :param str ee_id: the id of the execution environment returned by create_execution_environment - or register_execution_environment - :param str artifact_path: where to locate the artifacts (parent folder) using the self.fs - the final artifact path will be a combination of this artifact_path and additional string from - the config_dict (e.g. charm name) + :param str ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment + :param str artifact_path: where to locate the artifacts (parent folder) using + the self.fs + the final artifact path will be a combination of this artifact_path and + additional string from the config_dict (e.g. charm name) :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: """ @@ -239,34 +259,34 @@ class N2VCConnector(abc.ABC, Loggable): ee_id: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ - Generate a priv/pub key pair in the execution environment and return the public key + Generate a priv/pub key pair in the execution environment and return the public + key - :param str ee_id: the id of the execution environment returned by create_execution_environment - or register_execution_environment + :param str ee_id: the id of the execution environment returned by + create_execution_environment or register_execution_environment :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns: public key of the execution environment - For the case of juju proxy charm ssh-layered, it is the one returned by 'get-ssh-public-key' - primitive. + For the case of juju proxy charm ssh-layered, it is the one + returned by 'get-ssh-public-key' primitive. It raises a N2VC exception if fails """ @abc.abstractmethod async def add_relation( - self, - ee_id_1: str, - ee_id_2: str, - endpoint_1: str, - endpoint_2: str + self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str ): """ - Add a relation between two Execution Environments (using their associated endpoints). + Add a relation between two Execution Environments (using their associated + endpoints). :param str ee_id_1: The id of the first execution environment :param str ee_id_2: The id of the second execution environment @@ -276,49 +296,43 @@ class N2VCConnector(abc.ABC, Loggable): # TODO @abc.abstractmethod - async def remove_relation( - self - ): + async def remove_relation(self): """ """ # TODO @abc.abstractmethod - async def deregister_execution_environments( - self - ): + async def deregister_execution_environments(self): """ """ @abc.abstractmethod async def delete_namespace( - self, - namespace: str, - db_dict: dict = None, - total_timeout: float = None + self, namespace: str, db_dict: dict = None, total_timeout: float = None ): """ Remove a network scenario and its execution environments :param namespace: []. :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float total_timeout: """ @abc.abstractmethod async def delete_execution_environment( - self, - ee_id: str, - db_dict: dict = None, - total_timeout: float = None + self, ee_id: str, db_dict: dict = None, total_timeout: float = None ): """ Delete an execution environment :param str ee_id: id of the execution environment to delete :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float total_timeout: """ @@ -330,18 +344,22 @@ class N2VCConnector(abc.ABC, Loggable): params_dict: dict, db_dict: dict = None, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: """ Execute a primitive in the execution environment - :param str ee_id: the one returned by create_execution_environment or register_execution_environment - :param str primitive_name: must be one defined in the software. There is one called 'config', - where, for the proxy case, the 'credentials' of VM are provided + :param str ee_id: the one returned by create_execution_environment or + register_execution_environment + :param str primitive_name: must be one defined in the software. There is one + called 'config', where, for the proxy case, the 'credentials' of VM are + provided :param dict params_dict: parameters of the action :param dict db_dict: where to write into database when the status changes. - It contains a dict with {collection: , filter: {}, path: }, - e.g. {collection: "nsrs", filter: {_id: , path: "_admin.deployed.VCA.3"} + It contains a dict with + {collection: , filter: {}, path: }, + e.g. {collection: "nsrs", filter: + {_id: , path: "_admin.deployed.VCA.3"} :param float progress_timeout: :param float total_timeout: :returns str: primitive result, if ok. It raises exceptions in case of fail @@ -353,9 +371,9 @@ class N2VCConnector(abc.ABC, Loggable): """ """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str): @@ -368,10 +386,12 @@ class N2VCConnector(abc.ABC, Loggable): # check parameters if namespace is None or len(namespace) == 0: - raise N2VCBadArgumentsException('Argument namespace is mandatory', ['namespace']) + raise N2VCBadArgumentsException( + "Argument namespace is mandatory", ["namespace"] + ) # split namespace components - parts = namespace.split('.') + parts = namespace.split(".") nsi_id = None ns_id = None vnf_id = None @@ -385,7 +405,7 @@ class N2VCConnector(abc.ABC, Loggable): vnf_id = parts[2] if len(parts) > 3 and len(parts[3]) > 0: vdu_id = parts[3] - vdu_parts = parts[3].split('-') + vdu_parts = parts[3].split("-") if len(vdu_parts) > 1: vdu_id = vdu_parts[0] vdu_count = vdu_parts[1] @@ -393,79 +413,85 @@ class N2VCConnector(abc.ABC, Loggable): return nsi_id, ns_id, vnf_id, vdu_id, vdu_count async def write_app_status_to_db( - self, - db_dict: dict, - status: N2VCDeploymentStatus, - detailed_status: str, - vca_status: str, - entity_type: str + self, + db_dict: dict, + status: N2VCDeploymentStatus, + detailed_status: str, + vca_status: str, + entity_type: str, ): if not db_dict: - self.log.debug('No db_dict => No database write') + self.log.debug("No db_dict => No database write") return - # self.log.debug('status={} / detailed-status={} / VCA-status={} / entity_type={}' - # .format(str(status.value), detailed_status, vca_status, entity_type)) + # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}' + # .format(str(status.value), detailed_status, vca_status, entity_type)) try: - the_table = db_dict['collection'] - the_filter = db_dict['filter'] - the_path = db_dict['path'] - if not the_path[-1] == '.': - the_path = the_path + '.' + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + the_path = db_dict["path"] + if not the_path[-1] == ".": + the_path = the_path + "." update_dict = { - the_path + 'status': str(status.value), - the_path + 'detailed-status': detailed_status, - the_path + 'VCA-status': vca_status, - the_path + 'entity-type': entity_type, - the_path + 'status-time': str(time.time()), + the_path + "status": str(status.value), + the_path + "detailed-status": detailed_status, + the_path + "VCA-status": vca_status, + the_path + "entity-type": entity_type, + the_path + "status-time": str(time.time()), } self.db.set_one( table=the_table, q_filter=the_filter, update_dict=update_dict, - fail_on_empty=True + fail_on_empty=True, ) # database callback if self.on_update_db: if asyncio.iscoroutinefunction(self.on_update_db): - await self.on_update_db(the_table, the_filter, the_path, update_dict) + await self.on_update_db( + the_table, the_filter, the_path, update_dict + ) else: self.on_update_db(the_table, the_filter, the_path, update_dict) except DbException as e: if e.http_code == HTTPStatus.NOT_FOUND: - self.log.error('NOT_FOUND error: Exception writing status to database: {}'.format(e)) + self.log.error( + "NOT_FOUND error: Exception writing status to database: {}".format( + e + ) + ) else: - self.log.info('Exception writing status to database: {}'.format(e)) + self.log.info("Exception writing status to database: {}".format(e)) -def juju_status_2_osm_status(type: str, status: str) -> N2VCDeploymentStatus: - if type == 'application' or type == 'unit': - if status in ['waiting', 'maintenance']: +def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: + if statustype == "application" or statustype == "unit": + if status in ["waiting", "maintenance"]: return N2VCDeploymentStatus.RUNNING - if status in ['error']: - return N2VCDeploymentStatus.FAILED - elif status in ['active']: + if status in ["error"]: + return N2VCDeploymentStatus.FAILED + elif status in ["active"]: return N2VCDeploymentStatus.COMPLETED - elif status in ['blocked']: + elif status in ["blocked"]: return N2VCDeploymentStatus.RUNNING else: return N2VCDeploymentStatus.UNKNOWN - elif type == 'action': - if status in ['running']: + elif statustype == "action": + if status in ["running"]: return N2VCDeploymentStatus.RUNNING - elif status in ['completed']: + elif status in ["completed"]: return N2VCDeploymentStatus.COMPLETED else: return N2VCDeploymentStatus.UNKNOWN - elif type == 'machine': - if status in ['pending']: + elif statustype == "machine": + if status in ["pending"]: return N2VCDeploymentStatus.PENDING - elif status in ['started']: + elif status in ["started"]: return N2VCDeploymentStatus.COMPLETED else: return N2VCDeploymentStatus.UNKNOWN @@ -479,12 +505,12 @@ def obj_to_yaml(obj: object) -> str: # split lines lines = dump_text.splitlines() # remove !!python/object tags - yaml_text = '' + yaml_text = "" for line in lines: - index = line.find('!!python/object') + index = line.find("!!python/object") if index >= 0: line = line[:index] - yaml_text += line + '\n' + yaml_text += line + "\n" return yaml_text diff --git a/n2vc/n2vc_juju_conn.py b/n2vc/n2vc_juju_conn.py index f48838d..0696e20 100644 --- a/n2vc/n2vc_juju_conn.py +++ b/n2vc/n2vc_juju_conn.py @@ -20,38 +20,42 @@ # contact with: nfvlabs@tid.es ## -import logging -import os import asyncio -import time import base64 import binascii +import logging +import os import re +import time -from n2vc.n2vc_conn import N2VCConnector -from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml -from n2vc.exceptions \ - import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \ - N2VCExecutionException, N2VCInvalidCertificate, N2VCNotFound -from n2vc.juju_observer import JujuModelObserver - -from juju.controller import Controller -from juju.model import Model -from juju.application import Application from juju.action import Action -from juju.machine import Machine +from juju.application import Application from juju.client import client +from juju.controller import Controller from juju.errors import JujuAPIError - +from juju.machine import Machine +from juju.model import Model +from n2vc.exceptions import ( + N2VCBadArgumentsException, + N2VCException, + N2VCConnectionException, + N2VCExecutionException, + N2VCInvalidCertificate, + N2VCNotFound, + MethodNotImplemented, +) +from n2vc.juju_observer import JujuModelObserver +from n2vc.n2vc_conn import N2VCConnector +from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml from n2vc.provisioner import SSHProvisioner class N2VCJujuConnector(N2VCConnector): """ - ################################################################################################## - ########################################## P U B L I C ########################################### - ################################################################################################## + #################################################################################### + ################################### P U B L I C #################################### + #################################################################################### """ BUILT_IN_CLOUDS = ["localhost", "microk8s"] @@ -62,10 +66,10 @@ class N2VCJujuConnector(N2VCConnector): fs: object, log: object = None, loop: object = None, - url: str = '127.0.0.1:17070', - username: str = 'admin', + url: str = "127.0.0.1:17070", + username: str = "admin", vca_config: dict = None, - on_update_db=None + on_update_db=None, ): """Initialize juju N2VC connector """ @@ -80,15 +84,15 @@ class N2VCJujuConnector(N2VCConnector): url=url, username=username, vca_config=vca_config, - on_update_db=on_update_db + on_update_db=on_update_db, ) # silence websocket traffic log - logging.getLogger('websockets.protocol').setLevel(logging.INFO) - logging.getLogger('juju.client.connection').setLevel(logging.WARN) - logging.getLogger('model').setLevel(logging.WARN) + logging.getLogger("websockets.protocol").setLevel(logging.INFO) + logging.getLogger("juju.client.connection").setLevel(logging.WARN) + logging.getLogger("model").setLevel(logging.WARN) - self.log.info('Initializing N2VC juju connector...') + self.log.info("Initializing N2VC juju connector...") """ ############################################################## @@ -98,33 +102,43 @@ class N2VCJujuConnector(N2VCConnector): # juju URL if url is None: - raise N2VCBadArgumentsException('Argument url is mandatory', ['url']) - url_parts = url.split(':') + raise N2VCBadArgumentsException("Argument url is mandatory", ["url"]) + url_parts = url.split(":") if len(url_parts) != 2: - raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url']) + raise N2VCBadArgumentsException( + "Argument url: bad format (localhost:port) -> {}".format(url), ["url"] + ) self.hostname = url_parts[0] try: self.port = int(url_parts[1]) except ValueError: - raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url']) + raise N2VCBadArgumentsException( + "url port must be a number -> {}".format(url), ["url"] + ) # juju USERNAME if username is None: - raise N2VCBadArgumentsException('Argument username is mandatory', ['username']) + raise N2VCBadArgumentsException( + "Argument username is mandatory", ["username"] + ) # juju CONFIGURATION if vca_config is None: - raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config']) + raise N2VCBadArgumentsException( + "Argument vca_config is mandatory", ["vca_config"] + ) - if 'secret' in vca_config: - self.secret = vca_config['secret'] + if "secret" in vca_config: + self.secret = vca_config["secret"] else: - raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret']) + raise N2VCBadArgumentsException( + "Argument vca_config.secret is mandatory", ["vca_config.secret"] + ) # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub # if exists, it will be written in lcm container: _create_juju_public_key() - if 'public_key' in vca_config: - self.public_key = vca_config['public_key'] + if "public_key" in vca_config: + self.public_key = vca_config["public_key"] else: self.public_key = None @@ -139,52 +153,57 @@ class N2VCJujuConnector(N2VCConnector): try: cacert = base64.b64decode(b64string).decode("utf-8") - cacert = re.sub( - r'\\n', - r'\n', - cacert, - ) + cacert = re.sub(r"\\n", r"\n", cacert,) except binascii.Error as e: self.log.debug("Caught binascii.Error: {}".format(e)) raise N2VCInvalidCertificate(message="Invalid CA Certificate") return cacert - self.ca_cert = vca_config.get('ca_cert') + self.ca_cert = vca_config.get("ca_cert") if self.ca_cert: - self.ca_cert = base64_to_cacert(vca_config['ca_cert']) + self.ca_cert = base64_to_cacert(vca_config["ca_cert"]) - if 'api_proxy' in vca_config: - self.api_proxy = vca_config['api_proxy'] - self.log.debug('api_proxy for native charms configured: {}'.format(self.api_proxy)) + if "api_proxy" in vca_config: + self.api_proxy = vca_config["api_proxy"] + self.log.debug( + "api_proxy for native charms configured: {}".format(self.api_proxy) + ) else: - self.warning('api_proxy is not configured. Support for native charms is disabled') + self.warning( + "api_proxy is not configured. Support for native charms is disabled" + ) - if 'enable_os_upgrade' in vca_config: - self.enable_os_upgrade = vca_config['enable_os_upgrade'] + if "enable_os_upgrade" in vca_config: + self.enable_os_upgrade = vca_config["enable_os_upgrade"] else: self.enable_os_upgrade = True - if 'apt_mirror' in vca_config: - self.apt_mirror = vca_config['apt_mirror'] + if "apt_mirror" in vca_config: + self.apt_mirror = vca_config["apt_mirror"] else: self.apt_mirror = None - self.cloud = vca_config.get('cloud') + self.cloud = vca_config.get("cloud") # self.log.debug('Arguments have been checked') # juju data - self.controller = None # it will be filled when connect to juju - self.juju_models = {} # model objects for every model_name - self.juju_observers = {} # model observers for every model_name - self._connecting = False # while connecting to juju (to avoid duplicate connections) - self._authenticated = False # it will be True when juju connection be stablished - self._creating_model = False # True during model creation - - # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub + self.controller = None # it will be filled when connect to juju + self.juju_models = {} # model objects for every model_name + self.juju_observers = {} # model observers for every model_name + self._connecting = ( + False # while connecting to juju (to avoid duplicate connections) + ) + self._authenticated = ( + False # it will be True when juju connection be stablished + ) + self._creating_model = False # True during model creation + + # create juju pub key file in lcm container at + # ./local/share/juju/ssh/juju_id_rsa.pub self._create_juju_public_key() - self.log.info('N2VC juju connector initialized') + self.log.info("N2VC juju connector initialized") async def get_status(self, namespace: str, yaml_format: bool = True): @@ -193,13 +212,15 @@ class N2VCJujuConnector(N2VCConnector): if not self._authenticated: await self._juju_login() - nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace) + _nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components( + namespace=namespace + ) # model name is ns_id model_name = ns_id if model_name is None: - msg = 'Namespace {} not valid'.format(namespace) + msg = "Namespace {} not valid".format(namespace) self.log.error(msg) - raise N2VCBadArgumentsException(msg, ['namespace']) + raise N2VCBadArgumentsException(msg, ["namespace"]) # get juju model (create model if needed) model = await self._juju_get_model(model_name=model_name) @@ -217,26 +238,41 @@ class N2VCJujuConnector(N2VCConnector): db_dict: dict, reuse_ee_id: str = None, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> (str, dict): - self.log.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id)) + self.log.info( + "Creating execution environment. namespace: {}, reuse_ee_id: {}".format( + namespace, reuse_ee_id + ) + ) if not self._authenticated: await self._juju_login() machine_id = None if reuse_ee_id: - model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id) + model_name, application_name, machine_id = self._get_ee_id_components( + ee_id=reuse_ee_id + ) else: - nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace) + ( + _nsi_id, + ns_id, + _vnf_id, + _vdu_id, + _vdu_count, + ) = self._get_namespace_components(namespace=namespace) # model name is ns_id model_name = ns_id # application name application_name = self._get_application_name(namespace=namespace) - self.log.debug('model name: {}, application name: {}, machine_id: {}' - .format(model_name, application_name, machine_id)) + self.log.debug( + "model name: {}, application name: {}, machine_id: {}".format( + model_name, application_name, machine_id + ) + ) # create or reuse a new juju machine try: @@ -246,10 +282,10 @@ class N2VCJujuConnector(N2VCConnector): machine_id=machine_id, db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) except Exception as e: - message = 'Error creating machine on juju: {}'.format(e) + message = "Error creating machine on juju: {}".format(e) self.log.error(message) raise N2VCException(message=message) @@ -257,15 +293,19 @@ class N2VCJujuConnector(N2VCConnector): ee_id = N2VCJujuConnector._build_ee_id( model_name=model_name, application_name=application_name, - machine_id=str(machine.entity_id) + machine_id=str(machine.entity_id), ) - self.log.debug('ee_id: {}'.format(ee_id)) + self.log.debug("ee_id: {}".format(ee_id)) # new machine credentials credentials = dict() - credentials['hostname'] = machine.dns_name + credentials["hostname"] = machine.dns_name - self.log.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials)) + self.log.info( + "Execution environment created. ee_id: {}, credentials: {}".format( + ee_id, credentials + ) + ) return ee_id, credentials @@ -275,31 +315,43 @@ class N2VCJujuConnector(N2VCConnector): credentials: dict, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: if not self._authenticated: await self._juju_login() - self.log.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials)) + self.log.info( + "Registering execution environment. namespace={}, credentials={}".format( + namespace, credentials + ) + ) if credentials is None: - raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials']) - if credentials.get('hostname'): - hostname = credentials['hostname'] + raise N2VCBadArgumentsException( + message="credentials are mandatory", bad_args=["credentials"] + ) + if credentials.get("hostname"): + hostname = credentials["hostname"] else: - raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname']) - if credentials.get('username'): - username = credentials['username'] + raise N2VCBadArgumentsException( + message="hostname is mandatory", bad_args=["credentials.hostname"] + ) + if credentials.get("username"): + username = credentials["username"] else: - raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username']) - if 'private_key_path' in credentials: - private_key_path = credentials['private_key_path'] + raise N2VCBadArgumentsException( + message="username is mandatory", bad_args=["credentials.username"] + ) + if "private_key_path" in credentials: + private_key_path = credentials["private_key_path"] else: # if not passed as argument, use generated private key path private_key_path = self.private_key_path - nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace) + _nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components( + namespace=namespace + ) # model name model_name = ns_id @@ -315,22 +367,24 @@ class N2VCJujuConnector(N2VCConnector): private_key_path=private_key_path, db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) except Exception as e: - self.log.error('Error registering machine: {}'.format(e)) - raise N2VCException(message='Error registering machine on juju: {}'.format(e)) + self.log.error("Error registering machine: {}".format(e)) + raise N2VCException( + message="Error registering machine on juju: {}".format(e) + ) - self.log.info('Machine registered: {}'.format(machine_id)) + self.log.info("Machine registered: {}".format(machine_id)) # id for the execution environment ee_id = N2VCJujuConnector._build_ee_id( model_name=model_name, application_name=application_name, - machine_id=str(machine_id) + machine_id=str(machine_id), ) - self.log.info('Execution environment registered. ee_id: {}'.format(ee_id)) + self.log.info("Execution environment registered. ee_id: {}".format(ee_id)) return ee_id @@ -344,45 +398,65 @@ class N2VCJujuConnector(N2VCConnector): config: dict = None, ): - self.log.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}' - .format(ee_id, artifact_path, db_dict)) + self.log.info( + ( + "Installing configuration sw on ee_id: {}, " + "artifact path: {}, db_dict: {}" + ).format(ee_id, artifact_path, db_dict) + ) if not self._authenticated: await self._juju_login() # check arguments if ee_id is None or len(ee_id) == 0: - raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id']) + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) if artifact_path is None or len(artifact_path) == 0: - raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path']) + raise N2VCBadArgumentsException( + message="artifact_path is mandatory", bad_args=["artifact_path"] + ) if db_dict is None: - raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict']) + raise N2VCBadArgumentsException( + message="db_dict is mandatory", bad_args=["db_dict"] + ) try: - model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) - self.log.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id)) - except Exception as e: + ( + model_name, + application_name, + machine_id, + ) = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) + self.log.debug( + "model: {}, application: {}, machine: {}".format( + model_name, application_name, machine_id + ) + ) + except Exception: raise N2VCBadArgumentsException( - message='ee_id={} is not a valid execution environment id'.format(ee_id), - bad_args=['ee_id'] + message="ee_id={} is not a valid execution environment id".format( + ee_id + ), + bad_args=["ee_id"], ) # remove // in charm path - while artifact_path.find('//') >= 0: - artifact_path = artifact_path.replace('//', '/') + while artifact_path.find("//") >= 0: + artifact_path = artifact_path.replace("//", "/") # check charm path if not self.fs.file_exists(artifact_path, mode="dir"): - msg = 'artifact path does not exist: {}'.format(artifact_path) - raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path']) + msg = "artifact path does not exist: {}".format(artifact_path) + raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"]) - if artifact_path.startswith('/'): + if artifact_path.startswith("/"): full_path = self.fs.path + artifact_path else: - full_path = self.fs.path + '/' + artifact_path + full_path = self.fs.path + "/" + artifact_path try: - application, retries = await self._juju_deploy_charm( + await self._juju_deploy_charm( model_name=model_name, application_name=application_name, charm_path=full_path, @@ -390,39 +464,59 @@ class N2VCJujuConnector(N2VCConnector): db_dict=db_dict, progress_timeout=progress_timeout, total_timeout=total_timeout, - config=config + config=config, ) except Exception as e: - raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e)) + raise N2VCException( + message="Error desploying charm into ee={} : {}".format(ee_id, e) + ) - self.log.info('Configuration sw installed') + self.log.info("Configuration sw installed") async def get_ee_ssh_public__key( self, ee_id: str, db_dict: dict, progress_timeout: float = None, - total_timeout: float = None + total_timeout: float = None, ) -> str: - self.log.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict)) + self.log.info( + ( + "Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}" + ).format(ee_id, db_dict) + ) if not self._authenticated: await self._juju_login() # check arguments if ee_id is None or len(ee_id) == 0: - raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id']) + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) if db_dict is None: - raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict']) + raise N2VCBadArgumentsException( + message="db_dict is mandatory", bad_args=["db_dict"] + ) try: - model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) - self.log.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id)) - except Exception as e: + ( + model_name, + application_name, + machine_id, + ) = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) + self.log.debug( + "model: {}, application: {}, machine: {}".format( + model_name, application_name, machine_id + ) + ) + except Exception: raise N2VCBadArgumentsException( - message='ee_id={} is not a valid execution environment id'.format(ee_id), - bad_args=['ee_id'] + message="ee_id={} is not a valid execution environment id".format( + ee_id + ), + bad_args=["ee_id"], ) # try to execute ssh layer primitives (if exist): @@ -433,29 +527,33 @@ class N2VCJujuConnector(N2VCConnector): # execute action: generate-ssh-key try: - output, status = await self._juju_execute_action( + output, _status = await self._juju_execute_action( model_name=model_name, application_name=application_name, - action_name='generate-ssh-key', + action_name="generate-ssh-key", db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) except Exception as e: - self.log.info('Skipping exception while executing action generate-ssh-key: {}'.format(e)) + self.log.info( + "Skipping exception while executing action generate-ssh-key: {}".format( + e + ) + ) # execute action: get-ssh-public-key try: - output, status = await self._juju_execute_action( + output, _status = await self._juju_execute_action( model_name=model_name, application_name=application_name, - action_name='get-ssh-public-key', + action_name="get-ssh-public-key", db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) except Exception as e: - msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e) + msg = "Cannot execute action get-ssh-public-key: {}\n".format(e) self.log.info(msg) raise N2VCException(msg) @@ -463,46 +561,47 @@ class N2VCJujuConnector(N2VCConnector): return output["pubkey"] if "pubkey" in output else output async def add_relation( - self, - ee_id_1: str, - ee_id_2: str, - endpoint_1: str, - endpoint_2: str + self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str ): - self.log.debug('adding new relation between {} and {}, endpoints: {}, {}' - .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2)) + self.log.debug( + "adding new relation between {} and {}, endpoints: {}, {}".format( + ee_id_1, ee_id_2, endpoint_1, endpoint_2 + ) + ) # check arguments if not ee_id_1: - message = 'EE 1 is mandatory' + message = "EE 1 is mandatory" self.log.error(message) - raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1']) + raise N2VCBadArgumentsException(message=message, bad_args=["ee_id_1"]) if not ee_id_2: - message = 'EE 2 is mandatory' + message = "EE 2 is mandatory" self.log.error(message) - raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_2']) + raise N2VCBadArgumentsException(message=message, bad_args=["ee_id_2"]) if not endpoint_1: - message = 'endpoint 1 is mandatory' + message = "endpoint 1 is mandatory" self.log.error(message) - raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_1']) + raise N2VCBadArgumentsException(message=message, bad_args=["endpoint_1"]) if not endpoint_2: - message = 'endpoint 2 is mandatory' + message = "endpoint 2 is mandatory" self.log.error(message) - raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_2']) + raise N2VCBadArgumentsException(message=message, bad_args=["endpoint_2"]) if not self._authenticated: await self._juju_login() # get the model, the applications and the machines from the ee_id's - model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1) - model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2) + model_1, app_1, _machine_1 = self._get_ee_id_components(ee_id_1) + model_2, app_2, _machine_2 = self._get_ee_id_components(ee_id_2) # model must be the same if model_1 != model_2: - message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2) + message = "EE models are not the same: {} vs {}".format(ee_id_1, ee_id_2) self.log.error(message) - raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2']) + raise N2VCBadArgumentsException( + message=message, bad_args=["ee_id_1", "ee_id_2"] + ) # add juju relations between two applications try: @@ -511,131 +610,154 @@ class N2VCJujuConnector(N2VCConnector): application_name_1=app_1, application_name_2=app_2, relation_1=endpoint_1, - relation_2=endpoint_2 + relation_2=endpoint_2, ) except Exception as e: - message = 'Error adding relation between {} and {}: {}'.format(ee_id_1, ee_id_2, e) + message = "Error adding relation between {} and {}: {}".format( + ee_id_1, ee_id_2, e + ) self.log.error(message) raise N2VCException(message=message) - async def remove_relation( - self - ): + async def remove_relation(self): if not self._authenticated: await self._juju_login() # TODO - self.log.info('Method not implemented yet') - raise NotImplemented() + self.log.info("Method not implemented yet") + raise MethodNotImplemented() - async def deregister_execution_environments( - self - ): + async def deregister_execution_environments(self): if not self._authenticated: await self._juju_login() # TODO - self.log.info('Method not implemented yet') - raise NotImplemented() + self.log.info("Method not implemented yet") + raise MethodNotImplemented() async def delete_namespace( - self, - namespace: str, - db_dict: dict = None, - total_timeout: float = None + self, namespace: str, db_dict: dict = None, total_timeout: float = None ): - self.log.info('Deleting namespace={}'.format(namespace)) + self.log.info("Deleting namespace={}".format(namespace)) if not self._authenticated: await self._juju_login() # check arguments if namespace is None: - raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace']) + raise N2VCBadArgumentsException( + message="namespace is mandatory", bad_args=["namespace"] + ) - nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace) + _nsi_id, ns_id, _vnf_id, _vdu_id, _vdu_count = self._get_namespace_components( + namespace=namespace + ) if ns_id is not None: try: await self._juju_destroy_model( - model_name=ns_id, - total_timeout=total_timeout + model_name=ns_id, total_timeout=total_timeout ) except N2VCNotFound: raise except Exception as e: - raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e)) + raise N2VCException( + message="Error deleting namespace {} : {}".format(namespace, e) + ) else: - raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace']) + raise N2VCBadArgumentsException( + message="only ns_id is permitted to delete yet", bad_args=["namespace"] + ) - self.log.info('Namespace {} deleted'.format(namespace)) + self.log.info("Namespace {} deleted".format(namespace)) async def delete_execution_environment( - self, - ee_id: str, - db_dict: dict = None, - total_timeout: float = None + self, ee_id: str, db_dict: dict = None, total_timeout: float = None ): - self.log.info('Deleting execution environment ee_id={}'.format(ee_id)) + self.log.info("Deleting execution environment ee_id={}".format(ee_id)) if not self._authenticated: await self._juju_login() # check arguments if ee_id is None: - raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id']) + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) - model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id) + model_name, application_name, _machine_id = self._get_ee_id_components( + ee_id=ee_id + ) # destroy the application try: - await self._juju_destroy_application(model_name=model_name, application_name=application_name) + await self._juju_destroy_application( + model_name=model_name, application_name=application_name + ) except Exception as e: - raise N2VCException(message='Error deleting execution environment {} (application {}) : {}' - .format(ee_id, application_name, e)) + raise N2VCException( + message=( + "Error deleting execution environment {} (application {}) : {}" + ).format(ee_id, application_name, e) + ) # destroy the machine - # try: + # try: # await self._juju_destroy_machine( # model_name=model_name, # machine_id=machine_id, # total_timeout=total_timeout # ) # except Exception as e: - # raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}' - # .format(ee_id, machine_id, e)) + # raise N2VCException( + # message='Error deleting execution environment {} (machine {}) : {}' + # .format(ee_id, machine_id, e)) - self.log.info('Execution environment {} deleted'.format(ee_id)) + self.log.info("Execution environment {} deleted".format(ee_id)) async def exec_primitive( - self, - ee_id: str, - primitive_name: str, - params_dict: dict, - db_dict: dict = None, - progress_timeout: float = None, - total_timeout: float = None + self, + ee_id: str, + primitive_name: str, + params_dict: dict, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, ) -> str: - self.log.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict)) + self.log.info( + "Executing primitive: {} on ee: {}, params: {}".format( + primitive_name, ee_id, params_dict + ) + ) if not self._authenticated: await self._juju_login() # check arguments if ee_id is None or len(ee_id) == 0: - raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id']) + raise N2VCBadArgumentsException( + message="ee_id is mandatory", bad_args=["ee_id"] + ) if primitive_name is None or len(primitive_name) == 0: - raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name']) + raise N2VCBadArgumentsException( + message="action_name is mandatory", bad_args=["action_name"] + ) if params_dict is None: params_dict = dict() try: - model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) + ( + model_name, + application_name, + _machine_id, + ) = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id) except Exception: raise N2VCBadArgumentsException( - message='ee_id={} is not a valid execution environment id'.format(ee_id), - bad_args=['ee_id'] + message="ee_id={} is not a valid execution environment id".format( + ee_id + ), + bad_args=["ee_id"], ) - if primitive_name == 'config': + if primitive_name == "config": # Special case: config primitive try: await self._juju_configure_application( @@ -644,15 +766,17 @@ class N2VCJujuConnector(N2VCConnector): config=params_dict, db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) except Exception as e: - self.log.error('Error configuring juju application: {}'.format(e)) + self.log.error("Error configuring juju application: {}".format(e)) raise N2VCExecutionException( - message='Error configuring application into ee={} : {}'.format(ee_id, e), - primitive_name=primitive_name + message="Error configuring application into ee={} : {}".format( + ee_id, e + ), + primitive_name=primitive_name, ) - return 'CONFIG OK' + return "CONFIG OK" else: try: output, status = await self._juju_execute_action( @@ -664,59 +788,55 @@ class N2VCJujuConnector(N2VCConnector): total_timeout=total_timeout, **params_dict ) - if status == 'completed': + if status == "completed": return output else: - raise Exception('status is not completed: {}'.format(status)) + raise Exception("status is not completed: {}".format(status)) except Exception as e: - self.log.error('Error executing primitive {}: {}'.format(primitive_name, e)) + self.log.error( + "Error executing primitive {}: {}".format(primitive_name, e) + ) raise N2VCExecutionException( - message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e), - primitive_name=primitive_name + message="Error executing primitive {} into ee={} : {}".format( + primitive_name, ee_id, e + ), + primitive_name=primitive_name, ) async def disconnect(self): - self.log.info('closing juju N2VC...') + self.log.info("closing juju N2VC...") await self._juju_logout() """ - ################################################################################################## - ########################################## P R I V A T E ######################################### - ################################################################################################## + #################################################################################### + ################################### P R I V A T E ################################## + #################################################################################### """ - def _write_ee_id_db( - self, - db_dict: dict, - ee_id: str - ): + def _write_ee_id_db(self, db_dict: dict, ee_id: str): # write ee_id to database: _admin.deployed.VCA.x try: - the_table = db_dict['collection'] - the_filter = db_dict['filter'] - the_path = db_dict['path'] - if not the_path[-1] == '.': - the_path = the_path + '.' - update_dict = {the_path + 'ee_id': ee_id} + the_table = db_dict["collection"] + the_filter = db_dict["filter"] + the_path = db_dict["path"] + if not the_path[-1] == ".": + the_path = the_path + "." + update_dict = {the_path + "ee_id": ee_id} # self.log.debug('Writing ee_id to database: {}'.format(the_path)) self.db.set_one( table=the_table, q_filter=the_filter, update_dict=update_dict, - fail_on_empty=True + fail_on_empty=True, ) except asyncio.CancelledError: raise except Exception as e: - self.log.error('Error writing ee_id to database: {}'.format(e)) + self.log.error("Error writing ee_id to database: {}".format(e)) @staticmethod - def _build_ee_id( - model_name: str, - application_name: str, - machine_id: str - ): + def _build_ee_id(model_name: str, application_name: str, machine_id: str): """ Build an execution environment id form model, application and machine :param model_name: @@ -725,12 +845,10 @@ class N2VCJujuConnector(N2VCConnector): :return: """ # id for the execution environment - return '{}.{}.{}'.format(model_name, application_name, machine_id) + return "{}.{}.{}".format(model_name, application_name, machine_id) @staticmethod - def _get_ee_id_components( - ee_id: str - ) -> (str, str, str): + def _get_ee_id_components(ee_id: str) -> (str, str, str): """ Get model, application and machine components from an execution environment id :param ee_id: @@ -741,7 +859,7 @@ class N2VCJujuConnector(N2VCConnector): return None, None, None # split components of id - parts = ee_id.split('.') + parts = ee_id.split(".") model_name = parts[0] application_name = parts[1] machine_id = parts[2] @@ -757,40 +875,46 @@ class N2VCJujuConnector(N2VCConnector): # TODO: Enforce the Juju 50-character application limit # split namespace components - _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace) + _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components( + namespace=namespace + ) if vnf_id is None or len(vnf_id) == 0: - vnf_id = '' + vnf_id = "" else: # Shorten the vnf_id to its last twelve characters - vnf_id = 'vnf-' + vnf_id[-12:] + vnf_id = "vnf-" + vnf_id[-12:] if vdu_id is None or len(vdu_id) == 0: - vdu_id = '' + vdu_id = "" else: # Shorten the vdu_id to its last twelve characters - vdu_id = '-vdu-' + vdu_id[-12:] + vdu_id = "-vdu-" + vdu_id[-12:] if vdu_count is None or len(vdu_count) == 0: - vdu_count = '' + vdu_count = "" else: - vdu_count = '-cnt-' + vdu_count + vdu_count = "-cnt-" + vdu_count - application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count) + application_name = "app-{}{}{}".format(vnf_id, vdu_id, vdu_count) return N2VCJujuConnector._format_app_name(application_name) async def _juju_create_machine( - self, - model_name: str, - application_name: str, - machine_id: str = None, - db_dict: dict = None, - progress_timeout: float = None, - total_timeout: float = None + self, + model_name: str, + application_name: str, + machine_id: str = None, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, ) -> Machine: - self.log.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id)) + self.log.debug( + "creating machine in model: {}, existing machine id: {}".format( + model_name, machine_id + ) + ) # get juju model and observer (create model if needed) model = await self._juju_get_model(model_name=model_name) @@ -799,21 +923,20 @@ class N2VCJujuConnector(N2VCConnector): # find machine id in model machine = None if machine_id is not None: - self.log.debug('Finding existing machine id {} in model'.format(machine_id)) + self.log.debug("Finding existing machine id {} in model".format(machine_id)) # get juju existing machines in the model existing_machines = await model.get_machines() if machine_id in existing_machines: - self.log.debug('Machine id {} found in model (reusing it)'.format(machine_id)) + self.log.debug( + "Machine id {} found in model (reusing it)".format(machine_id) + ) machine = model.machines[machine_id] if machine is None: - self.log.debug('Creating a new machine in juju...') + self.log.debug("Creating a new machine in juju...") # machine does not exist, create it and wait for it machine = await model.add_machine( - spec=None, - constraints=None, - disks=None, - series='xenial' + spec=None, constraints=None, disks=None, series="xenial" ) # register machine with observer @@ -823,55 +946,58 @@ class N2VCJujuConnector(N2VCConnector): ee_id = N2VCJujuConnector._build_ee_id( model_name=model_name, application_name=application_name, - machine_id=str(machine.entity_id) + machine_id=str(machine.entity_id), ) # write ee_id in database - self._write_ee_id_db( - db_dict=db_dict, - ee_id=ee_id - ) + self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id) # wait for machine creation await observer.wait_for_machine( machine_id=str(machine.entity_id), progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) else: - self.log.debug('Reusing old machine pending') + self.log.debug("Reusing old machine pending") # register machine with observer observer.register_machine(machine=machine, db_dict=db_dict) - # machine does exist, but it is in creation process (pending), wait for create finalisation + # machine does exist, but it is in creation process (pending), wait for + # create finalisation await observer.wait_for_machine( machine_id=machine.entity_id, progress_timeout=progress_timeout, - total_timeout=total_timeout) + total_timeout=total_timeout, + ) self.log.debug("Machine ready at " + str(machine.dns_name)) return machine async def _juju_provision_machine( - self, - model_name: str, - hostname: str, - username: str, - private_key_path: str, - db_dict: dict = None, - progress_timeout: float = None, - total_timeout: float = None + self, + model_name: str, + hostname: str, + username: str, + private_key_path: str, + db_dict: dict = None, + progress_timeout: float = None, + total_timeout: float = None, ) -> str: if not self.api_proxy: - msg = 'Cannot provision machine: api_proxy is not defined' + msg = "Cannot provision machine: api_proxy is not defined" self.log.error(msg=msg) raise N2VCException(message=msg) - self.log.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username)) + self.log.debug( + "provisioning machine. model: {}, hostname: {}, username: {}".format( + model_name, hostname, username + ) + ) if not self._authenticated: await self._juju_login() @@ -887,7 +1013,7 @@ class N2VCJujuConnector(N2VCConnector): host=hostname, user=username, private_key_path=private_key_path, - log=self.log + log=self.log, ) params = None @@ -898,7 +1024,7 @@ class N2VCJujuConnector(N2VCConnector): self.log.error(msg) raise N2VCException(message=msg) - params.jobs = ['JobHostUnits'] + params.jobs = ["JobHostUnits"] connection = model.connection() @@ -917,25 +1043,28 @@ class N2VCJujuConnector(N2VCConnector): # Need to run this after AddMachines has been called, # as we need the machine_id self.log.debug("Installing Juju agent into machine {}".format(machine_id)) - asyncio.ensure_future(provisioner.install_agent( - connection=connection, - nonce=params.nonce, - machine_id=machine_id, - api=self.api_proxy, - )) + asyncio.ensure_future( + provisioner.install_agent( + connection=connection, + nonce=params.nonce, + machine_id=machine_id, + api=self.api_proxy, + ) + ) - # wait for machine in model (now, machine is not yet in model, so we must wait for it) + # wait for machine in model (now, machine is not yet in model, so we must + # wait for it) machine = None - for i in range(10): + for _ in range(10): machine_list = await model.get_machines() if machine_id in machine_list: - self.log.debug('Machine {} found in model!'.format(machine_id)) + self.log.debug("Machine {} found in model!".format(machine_id)) machine = model.machines.get(machine_id) break await asyncio.sleep(2) if machine is None: - msg = 'Machine {} not found in model'.format(machine_id) + msg = "Machine {} not found in model".format(machine_id) self.log.error(msg=msg) raise Exception(msg) @@ -943,11 +1072,11 @@ class N2VCJujuConnector(N2VCConnector): observer.register_machine(machine=machine, db_dict=db_dict) # wait for machine creation - self.log.debug('waiting for provision finishes... {}'.format(machine_id)) + self.log.debug("waiting for provision finishes... {}".format(machine_id)) await observer.wait_for_machine( machine_id=machine_id, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) self.log.debug("Machine provisioned {}".format(machine_id)) @@ -955,15 +1084,15 @@ class N2VCJujuConnector(N2VCConnector): return machine_id async def _juju_deploy_charm( - self, - model_name: str, - application_name: str, - charm_path: str, - machine_id: str, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - config: dict = None + self, + model_name: str, + application_name: str, + charm_path: str, + machine_id: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + config: dict = None, ) -> (Application, int): # get juju model and observer @@ -978,30 +1107,36 @@ class N2VCJujuConnector(N2VCConnector): if application is None: # application does not exist, create it and wait for it - self.log.debug('deploying application {} to machine {}, model {}' - .format(application_name, machine_id, model_name)) - self.log.debug('charm: {}'.format(charm_path)) - series = 'xenial' + self.log.debug( + "deploying application {} to machine {}, model {}".format( + application_name, machine_id, model_name + ) + ) + self.log.debug("charm: {}".format(charm_path)) + series = "xenial" # series = None application = await model.deploy( entity_url=charm_path, application_name=application_name, - channel='stable', + channel="stable", num_units=1, series=series, to=machine_id, - config=config + config=config, ) # register application with observer observer.register_application(application=application, db_dict=db_dict) - self.log.debug('waiting for application deployed... {}'.format(application.entity_id)) + self.log.debug( + "waiting for application deployed... {}".format(application.entity_id) + ) retries = await observer.wait_for_application( application_id=application.entity_id, progress_timeout=progress_timeout, - total_timeout=total_timeout) - self.log.debug('application deployed') + total_timeout=total_timeout, + ) + self.log.debug("application deployed") else: @@ -1009,31 +1144,34 @@ class N2VCJujuConnector(N2VCConnector): observer.register_application(application=application, db_dict=db_dict) # application already exists, but not finalised - self.log.debug('application already exists, waiting for deployed...') + self.log.debug("application already exists, waiting for deployed...") retries = await observer.wait_for_application( application_id=application.entity_id, progress_timeout=progress_timeout, - total_timeout=total_timeout) - self.log.debug('application deployed') + total_timeout=total_timeout, + ) + self.log.debug("application deployed") return application, retries async def _juju_execute_action( - self, - model_name: str, - application_name: str, - action_name: str, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None, - **kwargs + self, + model_name: str, + application_name: str, + action_name: str, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, + **kwargs ) -> Action: # get juju model and observer model = await self._juju_get_model(model_name=model_name) observer = self.juju_observers[model_name] - application = await self._juju_get_application(model_name=model_name, application_name=application_name) + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) unit = None for u in application.units: @@ -1042,7 +1180,9 @@ class N2VCJujuConnector(N2VCConnector): if unit is not None: actions = await application.get_actions() if action_name in actions: - self.log.debug('executing action "{}" using params: {}'.format(action_name, kwargs)) + self.log.debug( + 'executing action "{}" using params: {}'.format(action_name, kwargs) + ) action = await unit.run_action(action_name, **kwargs) # register action with observer @@ -1051,86 +1191,98 @@ class N2VCJujuConnector(N2VCConnector): await observer.wait_for_action( action_id=action.entity_id, progress_timeout=progress_timeout, - total_timeout=total_timeout) - self.log.debug('action completed with status: {}'.format(action.status)) + total_timeout=total_timeout, + ) + self.log.debug("action completed with status: {}".format(action.status)) output = await model.get_action_output(action_uuid=action.entity_id) status = await model.get_action_status(uuid_or_prefix=action.entity_id) if action.entity_id in status: status = status[action.entity_id] else: - status = 'failed' + status = "failed" return output, status raise N2VCExecutionException( - message='Cannot execute action on charm', - primitive_name=action_name + message="Cannot execute action on charm", primitive_name=action_name ) async def _juju_configure_application( - self, - model_name: str, - application_name: str, - config: dict, - db_dict: dict, - progress_timeout: float = None, - total_timeout: float = None + self, + model_name: str, + application_name: str, + config: dict, + db_dict: dict, + progress_timeout: float = None, + total_timeout: float = None, ): # get the application - application = await self._juju_get_application(model_name=model_name, application_name=application_name) + application = await self._juju_get_application( + model_name=model_name, application_name=application_name + ) - self.log.debug('configuring the application {} -> {}'.format(application_name, config)) + self.log.debug( + "configuring the application {} -> {}".format(application_name, config) + ) res = await application.set_config(config) - self.log.debug('application {} configured. res={}'.format(application_name, res)) + self.log.debug( + "application {} configured. res={}".format(application_name, res) + ) # Verify the config is set new_conf = await application.get_config() for key in config: - value = new_conf[key]['value'] - self.log.debug(' {} = {}'.format(key, value)) + value = new_conf[key]["value"] + self.log.debug(" {} = {}".format(key, value)) if config[key] != value: raise N2VCException( - message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key]) + message="key {} is not configured correctly {} != {}".format( + key, config[key], new_conf[key] + ) ) # check if 'verify-ssh-credentials' action exists # unit = application.units[0] actions = await application.get_actions() - if 'verify-ssh-credentials' not in actions: - msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name) + if "verify-ssh-credentials" not in actions: + msg = ( + "Action verify-ssh-credentials does not exist in application {}" + ).format(application_name) self.log.debug(msg=msg) return False # execute verify-credentials num_retries = 20 retry_timeout = 15.0 - for i in range(num_retries): + for _ in range(num_retries): try: - self.log.debug('Executing action verify-ssh-credentials...') + self.log.debug("Executing action verify-ssh-credentials...") output, ok = await self._juju_execute_action( model_name=model_name, application_name=application_name, - action_name='verify-ssh-credentials', + action_name="verify-ssh-credentials", db_dict=db_dict, progress_timeout=progress_timeout, - total_timeout=total_timeout + total_timeout=total_timeout, ) - self.log.debug('Result: {}, output: {}'.format(ok, output)) + self.log.debug("Result: {}, output: {}".format(ok, output)) return True except asyncio.CancelledError: raise except Exception as e: - self.log.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e)) + self.log.debug( + "Error executing verify-ssh-credentials: {}. Retrying...".format(e) + ) await asyncio.sleep(retry_timeout) else: - self.log.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries)) + self.log.error( + "Error executing verify-ssh-credentials after {} retries. ".format( + num_retries + ) + ) return False - async def _juju_get_application( - self, - model_name: str, - application_name: str - ): + async def _juju_get_application(self, model_name: str, application_name: str): """Get the deployed application.""" model = await self._juju_get_model(model_name=model_name) @@ -1140,7 +1292,11 @@ class N2VCJujuConnector(N2VCConnector): if model.applications and application_name in model.applications: return model.applications[application_name] else: - raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name)) + raise N2VCException( + message="Cannot get application {} from model {}".format( + application_name, model_name + ) + ) async def _juju_get_model(self, model_name: str) -> Model: """ Get a model object from juju controller @@ -1157,7 +1313,7 @@ class N2VCJujuConnector(N2VCConnector): return self.juju_models[model_name] if self._creating_model: - self.log.debug('Another coroutine is creating a model. Wait...') + self.log.debug("Another coroutine is creating a model. Wait...") while self._creating_model: # another coroutine is creating a model, wait await asyncio.sleep(0.1) @@ -1172,10 +1328,12 @@ class N2VCJujuConnector(N2VCConnector): model_list = await self.controller.list_models() if model_name not in model_list: - self.log.info('Model {} does not exist. Creating new model...'.format(model_name)) - config_dict = {'authorized-keys': self.public_key} + self.log.info( + "Model {} does not exist. Creating new model...".format(model_name) + ) + config_dict = {"authorized-keys": self.public_key} if self.apt_mirror: - config_dict['apt-mirror'] = self.apt_mirror + config_dict["apt-mirror"] = self.apt_mirror if not self.enable_os_upgrade: config_dict['enable-os-refresh-update'] = False config_dict['enable-os-upgrade'] = False @@ -1194,56 +1352,57 @@ class N2VCJujuConnector(N2VCConnector): ) self.log.info('New model created, name={}'.format(model_name)) else: - self.log.debug('Model already exists in juju. Getting model {}'.format(model_name)) + self.log.debug( + "Model already exists in juju. Getting model {}".format(model_name) + ) model = await self.controller.get_model(model_name) - self.log.debug('Existing model in juju, name={}'.format(model_name)) + self.log.debug("Existing model in juju, name={}".format(model_name)) self.juju_models[model_name] = model self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model) return model except Exception as e: - msg = 'Cannot get model {}. Exception: {}'.format(model_name, e) + msg = "Cannot get model {}. Exception: {}".format(model_name, e) self.log.error(msg) raise N2VCException(msg) finally: self._creating_model = False async def _juju_add_relation( - self, - model_name: str, - application_name_1: str, - application_name_2: str, - relation_1: str, - relation_2: str + self, + model_name: str, + application_name_1: str, + application_name_2: str, + relation_1: str, + relation_2: str, ): # get juju model and observer model = await self._juju_get_model(model_name=model_name) - r1 = '{}:{}'.format(application_name_1, relation_1) - r2 = '{}:{}'.format(application_name_2, relation_2) + r1 = "{}:{}".format(application_name_1, relation_1) + r2 = "{}:{}".format(application_name_2, relation_2) - self.log.debug('adding relation: {} -> {}'.format(r1, r2)) + self.log.debug("adding relation: {} -> {}".format(r1, r2)) try: await model.add_relation(relation1=r1, relation2=r2) except JujuAPIError as e: - # If one of the applications in the relationship doesn't exist, or the relation has already been added, + # If one of the applications in the relationship doesn't exist, or the + # relation has already been added, # let the operation fail silently. - if 'not found' in e.message: + if "not found" in e.message: return - if 'already exists' in e.message: + if "already exists" in e.message: return # another execption, raise it raise e - async def _juju_destroy_application( - self, - model_name: str, - application_name: str - ): + async def _juju_destroy_application(self, model_name: str, application_name: str): - self.log.debug('Destroying application {} in model {}'.format(application_name, model_name)) + self.log.debug( + "Destroying application {} in model {}".format(application_name, model_name) + ) # get juju model and observer model = await self._juju_get_model(model_name=model_name) @@ -1254,16 +1413,15 @@ class N2VCJujuConnector(N2VCConnector): observer.unregister_application(application_name) await application.destroy() else: - self.log.debug('Application not found: {}'.format(application_name)) + self.log.debug("Application not found: {}".format(application_name)) async def _juju_destroy_machine( - self, - model_name: str, - machine_id: str, - total_timeout: float = None + self, model_name: str, machine_id: str, total_timeout: float = None ): - self.log.debug('Destroying machine {} in model {}'.format(machine_id, model_name)) + self.log.debug( + "Destroying machine {} in model {}".format(machine_id, model_name) + ) if total_timeout is None: total_timeout = 3600 @@ -1276,7 +1434,8 @@ class N2VCJujuConnector(N2VCConnector): if machine_id in machines: machine = model.machines[machine_id] observer.unregister_machine(machine_id) - # TODO: change this by machine.is_manual when this is upstreamed: https://github.com/juju/python-libjuju/pull/396 + # TODO: change this by machine.is_manual when this is upstreamed: + # https://github.com/juju/python-libjuju/pull/396 if "instance-id" in machine.safe_data and machine.safe_data[ "instance-id" ].startswith("manual:"): @@ -1288,20 +1447,18 @@ class N2VCJujuConnector(N2VCConnector): # wait for machine removal machines = await model.get_machines() while machine_id in machines and time.time() < end: - self.log.debug("Waiting for machine {} is destroyed".format(machine_id)) + self.log.debug( + "Waiting for machine {} is destroyed".format(machine_id) + ) await asyncio.sleep(0.5) machines = await model.get_machines() self.log.debug("Machine destroyed: {}".format(machine_id)) else: - self.log.debug('Machine not found: {}'.format(machine_id)) + self.log.debug("Machine not found: {}".format(machine_id)) - async def _juju_destroy_model( - self, - model_name: str, - total_timeout: float = None - ): + async def _juju_destroy_model(self, model_name: str, total_timeout: float = None): - self.log.debug('Destroying model {}'.format(model_name)) + self.log.debug("Destroying model {}".format(model_name)) if total_timeout is None: total_timeout = 3600 @@ -1310,22 +1467,20 @@ class N2VCJujuConnector(N2VCConnector): model = await self._juju_get_model(model_name=model_name) if not model: - raise N2VCNotFound( - message="Model {} does not exist".format(model_name) - ) + raise N2VCNotFound(message="Model {} does not exist".format(model_name)) uuid = model.info.uuid # destroy applications for application_name in model.applications: try: - await self._juju_destroy_application(model_name=model_name, application_name=application_name) + await self._juju_destroy_application( + model_name=model_name, application_name=application_name + ) except Exception as e: self.log.error( "Error destroying application {} in model {}: {}".format( - application_name, - model_name, - e + application_name, model_name, e ) ) @@ -1333,35 +1488,43 @@ class N2VCJujuConnector(N2VCConnector): machines = await model.get_machines() for machine_id in machines: try: - await self._juju_destroy_machine(model_name=model_name, machine_id=machine_id) + await self._juju_destroy_machine( + model_name=model_name, machine_id=machine_id + ) except asyncio.CancelledError: raise - except Exception as e: + except Exception: # ignore exceptions destroying machine pass await self._juju_disconnect_model(model_name=model_name) - self.log.debug('destroying model {}...'.format(model_name)) + self.log.debug("destroying model {}...".format(model_name)) await self.controller.destroy_model(uuid) # self.log.debug('model destroy requested {}'.format(model_name)) # wait for model is completely destroyed - self.log.debug('Waiting for model {} to be destroyed...'.format(model_name)) - last_exception = '' + self.log.debug("Waiting for model {} to be destroyed...".format(model_name)) + last_exception = "" while time.time() < end: try: # await self.controller.get_model(uuid) models = await self.controller.list_models() if model_name not in models: - self.log.debug('The model {} ({}) was destroyed'.format(model_name, uuid)) + self.log.debug( + "The model {} ({}) was destroyed".format(model_name, uuid) + ) return except asyncio.CancelledError: raise except Exception as e: last_exception = e await asyncio.sleep(5) - raise N2VCException("Timeout waiting for model {} to be destroyed {}".format(model_name, last_exception)) + raise N2VCException( + "Timeout waiting for model {} to be destroyed {}".format( + model_name, last_exception + ) + ) async def _juju_login(self): """Connect to juju controller @@ -1384,8 +1547,13 @@ class N2VCJujuConnector(N2VCConnector): try: self._connecting = True self.log.info( - 'connecting to juju controller: {} {}:{}{}' - .format(self.url, self.username, self.secret[:8] + '...', ' with ca_cert' if self.ca_cert else '')) + "connecting to juju controller: {} {}:{}{}".format( + self.url, + self.username, + self.secret[:8] + "...", + " with ca_cert" if self.ca_cert else "", + ) + ) # Create controller object self.controller = Controller(loop=self.loop) @@ -1394,17 +1562,14 @@ class N2VCJujuConnector(N2VCConnector): endpoint=self.url, username=self.username, password=self.secret, - cacert=self.ca_cert + cacert=self.ca_cert, ) self._authenticated = True - self.log.info('juju controller connected') + self.log.info("juju controller connected") except Exception as e: - message = 'Exception connecting to juju: {}'.format(e) + message = "Exception connecting to juju: {}".format(e) self.log.error(message) - raise N2VCConnectionException( - message=message, - url=self.url - ) + raise N2VCConnectionException(message=message, url=self.url) finally: self._connecting = False @@ -1418,30 +1583,31 @@ class N2VCJujuConnector(N2VCConnector): try: await self._juju_disconnect_model(model_name) except Exception as e: - self.log.error('Error disconnecting model {} : {}'.format(model_name, e)) + self.log.error( + "Error disconnecting model {} : {}".format(model_name, e) + ) # continue with next model... self.log.info("Disconnecting controller") try: await self.controller.disconnect() except Exception as e: - raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url) + raise N2VCConnectionException( + message="Error disconnecting controller: {}".format(e), url=self.url + ) self.controller = None self._authenticated = False - self.log.info('disconnected') + self.log.info("disconnected") - async def _juju_disconnect_model( - self, - model_name: str - ): + async def _juju_disconnect_model(self, model_name: str): self.log.debug("Disconnecting model {}".format(model_name)) if model_name in self.juju_models: await self.juju_models[model_name].disconnect() self.juju_models[model_name] = None self.juju_observers[model_name] = None else: - self.warning('Cannot disconnect model: {}'.format(model_name)) + self.warning("Cannot disconnect model: {}".format(model_name)) def _create_juju_public_key(self): """Recreate the Juju public key on lcm container, if needed @@ -1452,24 +1618,28 @@ class N2VCJujuConnector(N2VCConnector): # Make sure that we have a public key before writing to disk if self.public_key is None or len(self.public_key) == 0: - if 'OSMLCM_VCA_PUBKEY' in os.environ: - self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '') + if "OSMLCM_VCA_PUBKEY" in os.environ: + self.public_key = os.getenv("OSMLCM_VCA_PUBKEY", "") if len(self.public_key) == 0: return else: return - pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~')) + pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~")) file_path = "{}/juju_id_rsa.pub".format(pk_path) - self.log.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key)) + self.log.debug( + "writing juju public key to file:\n{}\npublic key: {}".format( + file_path, self.public_key + ) + ) if not os.path.exists(pk_path): # create path and write file os.makedirs(pk_path) - with open(file_path, 'w') as f: - self.log.debug('Creating juju public key file: {}'.format(file_path)) + with open(file_path, "w") as f: + self.log.debug("Creating juju public key file: {}".format(file_path)) f.write(self.public_key) else: - self.log.debug('juju public key file already exists: {}'.format(file_path)) + self.log.debug("juju public key file already exists: {}".format(file_path)) @staticmethod def _format_model_name(name: str) -> str: @@ -1478,7 +1648,7 @@ class N2VCJujuConnector(N2VCConnector): Model names may only contain lowercase letters, digits and hyphens """ - return name.replace('_', '-').replace(' ', '-').lower() + return name.replace("_", "-").replace(" ", "-").lower() @staticmethod def _format_app_name(name: str) -> str: @@ -1499,24 +1669,24 @@ class N2VCJujuConnector(N2VCConnector): return False return True - new_name = name.replace('_', '-') - new_name = new_name.replace(' ', '-') + new_name = name.replace("_", "-") + new_name = new_name.replace(" ", "-") new_name = new_name.lower() - while new_name.find('--') >= 0: - new_name = new_name.replace('--', '-') - groups = new_name.split('-') + while new_name.find("--") >= 0: + new_name = new_name.replace("--", "-") + groups = new_name.split("-") # find 'all numbers' groups and prefix them with a letter - app_name = '' + app_name = "" for i in range(len(groups)): group = groups[i] if all_numbers(group): - group = 'z' + group + group = "z" + group if i > 0: - app_name += '-' + app_name += "-" app_name += group if app_name[0].isdigit(): - app_name = 'z' + app_name + app_name = "z" + app_name return app_name diff --git a/n2vc/provisioner.py b/n2vc/provisioner.py index 33c13f1..5107242 100644 --- a/n2vc/provisioner.py +++ b/n2vc/provisioner.py @@ -15,15 +15,15 @@ import logging import os import re import shlex +from subprocess import CalledProcessError import tempfile import time import uuid -from subprocess import CalledProcessError -import paramiko +from juju.client import client import n2vc.exceptions +import paramiko -from juju.client import client arches = [ [re.compile(r"amd64|x86_64"), "amd64"], @@ -32,7 +32,6 @@ arches = [ [re.compile(r"aarch64"), "arm64"], [re.compile(r"ppc64|ppc64el|ppc64le"), "ppc64el"], [re.compile(r"s390x?"), "s390x"], - ] @@ -79,6 +78,7 @@ iptables -t nat -A OUTPUT -p tcp -d {} -j DNAT --to-destination {} netfilter-persistent save """ + class SSHProvisioner: """Provision a manually created machine via SSH.""" @@ -121,7 +121,7 @@ class SSHProvisioner: # Read the private key into a paramiko.RSAKey if os.path.exists(private_key_path): - with open(private_key_path, 'r') as f: + with open(private_key_path, "r") as f: pkey = paramiko.RSAKey.from_private_key(f) ####################################################################### @@ -155,7 +155,7 @@ class SSHProvisioner: ) break except paramiko.ssh_exception.SSHException as e: - if 'Error reading SSH protocol banner' == str(e): + if "Error reading SSH protocol banner" == str(e): # Once more, with feeling ssh.connect(host, port=22, username=user, pkey=pkey) else: @@ -163,8 +163,10 @@ class SSHProvisioner: self.log.debug("Unhandled exception caught: {}".format(e)) raise e except Exception as e: - if 'Unable to connect to port' in str(e): - self.log.debug("Waiting for VM to boot, sleeping {} seconds".format(delay)) + if "Unable to connect to port" in str(e): + self.log.debug( + "Waiting for VM to boot, sleeping {} seconds".format(delay) + ) if attempts > retry: raise e else: @@ -194,17 +196,16 @@ class SSHProvisioner: if type(cmd) is not list: cmd = [cmd] - cmds = ' '.join(cmd) - stdin, stdout, stderr = ssh.exec_command(cmds, get_pty=pty) + cmds = " ".join(cmd) + _, stdout, stderr = ssh.exec_command(cmds, get_pty=pty) retcode = stdout.channel.recv_exit_status() if retcode > 0: output = stderr.read().strip() - raise CalledProcessError(returncode=retcode, cmd=cmd, - output=output) + raise CalledProcessError(returncode=retcode, cmd=cmd, output=output) return ( - stdout.read().decode('utf-8').strip(), - stderr.read().decode('utf-8').strip() + stdout.read().decode("utf-8").strip(), + stderr.read().decode("utf-8").strip(), ) def _init_ubuntu_user(self): @@ -218,7 +219,7 @@ class SSHProvisioner: try: # Run w/o allocating a pty, so we fail if sudo prompts for a passwd ssh = self._get_ssh_client() - stdout, stderr = self._run_command(ssh, "sudo -n true", pty=False) + self._run_command(ssh, "sudo -n true", pty=False) except paramiko.ssh_exception.AuthenticationException: raise n2vc.exceptions.AuthenticationFailed(self.user) except paramiko.ssh_exception.NoValidConnectionsError: @@ -228,7 +229,6 @@ class SSHProvisioner: ssh.close() # Infer the public key - public_key = None public_key_path = "{}.pub".format(self.private_key_path) if not os.path.exists(public_key_path): @@ -245,9 +245,7 @@ class SSHProvisioner: ssh = self._get_ssh_client() self._run_command( - ssh, - ["sudo", "/bin/bash -c " + shlex.quote(script)], - pty=True + ssh, ["sudo", "/bin/bash -c " + shlex.quote(script)], pty=True ) except paramiko.ssh_exception.AuthenticationException as e: raise e @@ -264,32 +262,30 @@ class SSHProvisioner: """ info = { - 'series': '', - 'arch': '', - 'cpu-cores': '', - 'mem': '', + "series": "", + "arch": "", + "cpu-cores": "", + "mem": "", } - stdout, stderr = self._run_command( - ssh, - ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], - pty=True, + stdout, _ = self._run_command( + ssh, ["sudo", "/bin/bash -c " + shlex.quote(DETECTION_SCRIPT)], pty=True, ) lines = stdout.split("\n") # Remove extraneous line if DNS resolution of hostname famils # i.e. sudo: unable to resolve host test-1-mgmtvm-1: Connection timed out - if 'unable to resolve host' in lines[0]: + if "unable to resolve host" in lines[0]: lines = lines[1:] - info['series'] = lines[0].strip() - info['arch'] = normalize_arch(lines[1].strip()) + info["series"] = lines[0].strip() + info["arch"] = normalize_arch(lines[1].strip()) - memKb = re.split(r'\s+', lines[2])[1] + memKb = re.split(r"\s+", lines[2])[1] # Convert megabytes -> kilobytes - info['mem'] = round(int(memKb) / 1024) + info["mem"] = round(int(memKb) / 1024) # Detect available CPUs recorded = {} @@ -302,7 +298,7 @@ class SSHProvisioner: cores = line.split(":")[1].strip() if physical_id not in recorded.keys(): - info['cpu-cores'] += cores + info["cpu-cores"] += cores recorded[physical_id] = True return info @@ -321,22 +317,19 @@ class SSHProvisioner: ssh = self._get_ssh_client() hw = self._detect_hardware_and_os(ssh) - params.series = hw['series'] + params.series = hw["series"] params.instance_id = "manual:{}".format(self.host) params.nonce = "manual:{}:{}".format( - self.host, - str(uuid.uuid4()), # a nop for Juju w/manual machines + self.host, str(uuid.uuid4()), # a nop for Juju w/manual machines ) params.hardware_characteristics = { - 'arch': hw['arch'], - 'mem': int(hw['mem']), - 'cpu-cores': int(hw['cpu-cores']), + "arch": hw["arch"], + "mem": int(hw["mem"]), + "cpu-cores": int(hw["cpu-cores"]), } - params.addresses = [{ - 'value': self.host, - 'type': 'ipv4', - 'scope': 'public', - }] + params.addresses = [ + {"value": self.host, "type": "ipv4", "scope": "public"} + ] except paramiko.ssh_exception.AuthenticationException as e: raise e @@ -378,7 +371,7 @@ class SSHProvisioner: - 127.0.0.1:17070 - '[::1]:17070' """ - m = re.search('apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070', results.script) + m = re.search(r"apiaddresses:\n- (\d+\.\d+\.\d+\.\d+):17070", results.script) apiaddress = m.group(1) """Add IP Table rule @@ -405,20 +398,18 @@ class SSHProvisioner: self._run_configure_script(script) break except Exception as e: - self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) - if attempts > retry: - raise e - else: - time.sleep(delay) - # Slowly back off the retry - delay += 15 + self.log.debug("Waiting for dpkg, sleeping {} seconds".format(delay)) + if attempts > retry: + raise e + else: + time.sleep(delay) + # Slowly back off the retry + delay += 15 # self.log.debug("Running configure script") self._run_configure_script(results.script) # self.log.debug("Configure script finished") - - def _run_configure_script(self, script: str): """Run the script to install the Juju agent on the target machine. @@ -427,25 +418,21 @@ class SSHProvisioner: if the upload fails """ _, tmpFile = tempfile.mkstemp() - with open(tmpFile, 'w') as f: + with open(tmpFile, "w") as f: f.write(script) try: # get ssh client - ssh = self._get_ssh_client( - user="ubuntu", - ) + ssh = self._get_ssh_client(user="ubuntu",) # copy the local copy of the script to the remote machine sftp = paramiko.SFTPClient.from_transport(ssh.get_transport()) sftp.put( - tmpFile, - tmpFile, + tmpFile, tmpFile, ) # run the provisioning script - stdout, stderr = self._run_command( - ssh, - "sudo /bin/bash {}".format(tmpFile), + self._run_command( + ssh, "sudo /bin/bash {}".format(tmpFile), ) except paramiko.ssh_exception.AuthenticationException as e: diff --git a/n2vc/tests/__init__.py b/n2vc/tests/__init__.py new file mode 100644 index 0000000..ec4fe4b --- /dev/null +++ b/n2vc/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/n2vc/tests/unit/__init__.py b/n2vc/tests/unit/__init__.py new file mode 100644 index 0000000..ec4fe4b --- /dev/null +++ b/n2vc/tests/unit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/n2vc/tests/unit/test_provisioner.py b/n2vc/tests/unit/test_provisioner.py new file mode 100644 index 0000000..880c5cb --- /dev/null +++ b/n2vc/tests/unit/test_provisioner.py @@ -0,0 +1,158 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase, mock + +from mock import mock_open +from n2vc.provisioner import SSHProvisioner +from paramiko.ssh_exception import SSHException + + +class ProvisionerTest(TestCase): + def setUp(self): + self.provisioner = SSHProvisioner(None, None, None) + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client(self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os): + mock_instance = mock_sshclient.return_value + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_no_connection( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.method_inside_someobject.side_effect = ["something"] + mock_instance.connect.side_effect = SSHException() + + self.assertRaises(SSHException, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual(1, mock_instance.connect.call_count, "Connect call count") + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_bad_banner( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.method_inside_someobject.side_effect = ["something"] + mock_instance.connect.side_effect = [ + SSHException("Error reading SSH protocol banner"), + None, + None, + ] + + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 3, mock_instance.connect.call_count, "Should attempt 3 connections" + ) + + @mock.patch("time.sleep", autospec=True) + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_unable_to_connect( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = Exception("Unable to connect to port") + + self.assertRaises(Exception, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 11, mock_instance.connect.call_count, "Should attempt 11 connections" + ) + + @mock.patch("time.sleep", autospec=True) + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_unable_to_connect_once( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os, _mock_sleep + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = [ + Exception("Unable to connect to port"), + None, + ] + + sshclient = self.provisioner._get_ssh_client() + self.assertEqual(mock_instance, sshclient) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 2, mock_instance.connect.call_count, "Should attempt 2 connections" + ) + + @mock.patch("n2vc.provisioner.os.path.exists") + @mock.patch("n2vc.provisioner.paramiko.RSAKey") + @mock.patch("n2vc.provisioner.paramiko.SSHClient") + @mock.patch("builtins.open", new_callable=mock_open, read_data="data") + def test__get_ssh_client_other_exception( + self, _mock_open, mock_sshclient, _mock_rsakey, _mock_os + ): + + mock_instance = mock_sshclient.return_value + mock_instance.connect.side_effect = Exception() + + self.assertRaises(Exception, self.provisioner._get_ssh_client) + self.assertEqual( + 1, + mock_instance.set_missing_host_key_policy.call_count, + "Missing host key call count", + ) + self.assertEqual( + 1, mock_instance.connect.call_count, "Should only attempt 1 connection" + ) + + +# diff --git a/n2vc/vnf.py b/n2vc/vnf.py index 9441f4a..4e46746 100644 --- a/n2vc/vnf.py +++ b/n2vc/vnf.py @@ -16,17 +16,22 @@ import asyncio import base64 import binascii import logging -import os import os.path import re import shlex import ssl import subprocess -import sys -# import time + +from juju.client import client +from juju.controller import Controller +from juju.errors import JujuAPIError, JujuError +from juju.model import ModelObserver + import n2vc.exceptions from n2vc.provisioner import SSHProvisioner + +# import time # FIXME: this should load the juju inside or modules without having to # explicitly install it. Check why it's not working. # Load our subtree of the juju library @@ -34,13 +39,6 @@ from n2vc.provisioner import SSHProvisioner # path = os.path.join(path, "modules/libjuju/") # if path not in sys.path: # sys.path.insert(1, path) - -from juju.client import client -from juju.controller import Controller -from juju.model import ModelObserver -from juju.errors import JujuAPIError, JujuError - - # We might need this to connect to the websocket securely, but test and verify. try: ssl._create_default_https_context = ssl._create_unverified_context @@ -73,14 +71,15 @@ class PrimitiveDoesNotExist(Exception): # Quiet the debug logging -logging.getLogger('websockets.protocol').setLevel(logging.INFO) -logging.getLogger('juju.client.connection').setLevel(logging.WARN) -logging.getLogger('juju.model').setLevel(logging.WARN) -logging.getLogger('juju.machine').setLevel(logging.WARN) +logging.getLogger("websockets.protocol").setLevel(logging.INFO) +logging.getLogger("juju.client.connection").setLevel(logging.WARN) +logging.getLogger("juju.model").setLevel(logging.WARN) +logging.getLogger("juju.machine").setLevel(logging.WARN) class VCAMonitor(ModelObserver): """Monitor state changes within the Juju Model.""" + log = None def __init__(self, ns_name): @@ -92,8 +91,8 @@ class VCAMonitor(ModelObserver): def AddApplication(self, application_name, callback, *callback_args): if application_name not in self.applications: self.applications[application_name] = { - 'callback': callback, - 'callback_args': callback_args + "callback": callback, + "callback_args": callback_args, } def RemoveApplication(self, application_name): @@ -105,36 +104,37 @@ class VCAMonitor(ModelObserver): if delta.entity == "unit": # Ignore change events from other applications - if delta.data['application'] not in self.applications.keys(): + if delta.data["application"] not in self.applications.keys(): return try: - application_name = delta.data['application'] + application_name = delta.data["application"] - callback = self.applications[application_name]['callback'] - callback_args = \ - self.applications[application_name]['callback_args'] + callback = self.applications[application_name]["callback"] + callback_args = self.applications[application_name]["callback_args"] if old and new: # Fire off a callback with the application state if callback: callback( self.ns_name, - delta.data['application'], + delta.data["application"], new.workload_status, new.workload_status_message, - *callback_args) + *callback_args, + ) if old and not new: # This is a charm being removed if callback: callback( self.ns_name, - delta.data['application'], + delta.data["application"], "removed", "", - *callback_args) + *callback_args, + ) except Exception as e: self.log.debug("[1] notify_callback exception: {}".format(e)) @@ -156,6 +156,7 @@ class VCAMonitor(ModelObserver): pass + ######## # TODO # @@ -164,18 +165,19 @@ class VCAMonitor(ModelObserver): class N2VC: - def __init__(self, - log=None, - server='127.0.0.1', - port=17070, - user='admin', - secret=None, - artifacts=None, - loop=None, - juju_public_key=None, - ca_cert=None, - api_proxy=None - ): + def __init__( + self, + log=None, + server="127.0.0.1", + port=17070, + user="admin", + secret=None, + artifacts=None, + loop=None, + juju_public_key=None, + ca_cert=None, + api_proxy=None, + ): """Initialize N2VC Initializes the N2VC object, allowing the caller to interoperate with the VCA. @@ -223,8 +225,8 @@ class N2VC: # For debugging self.refcount = { - 'controller': 0, - 'model': 0, + "controller": 0, + "model": 0, } self.models = {} @@ -242,7 +244,7 @@ class N2VC: if juju_public_key: self._create_juju_public_key(juju_public_key) else: - self.juju_public_key = '' + self.juju_public_key = "" # TODO: Verify ca_cert is valid before using. VCA will crash # if the ca_cert isn't formatted correctly. @@ -255,14 +257,10 @@ class N2VC: try: cacert = base64.b64decode(b64string).decode("utf-8") - cacert = re.sub( - r'\\n', - r'\n', - cacert, - ) + cacert = re.sub(r"\\n", r"\n", cacert,) except binascii.Error as e: self.log.debug("Caught binascii.Error: {}".format(e)) - raise n2vc.exceptions.InvalidCACertificate("Invalid CA Certificate") + raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate") return cacert @@ -270,25 +268,24 @@ class N2VC: if ca_cert: self.ca_cert = base64_to_cacert(ca_cert) - # Quiet websocket traffic - logging.getLogger('websockets.protocol').setLevel(logging.INFO) - logging.getLogger('juju.client.connection').setLevel(logging.WARN) - logging.getLogger('model').setLevel(logging.WARN) + logging.getLogger("websockets.protocol").setLevel(logging.INFO) + logging.getLogger("juju.client.connection").setLevel(logging.WARN) + logging.getLogger("model").setLevel(logging.WARN) # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG) - self.log.debug('JujuApi: instantiated') + self.log.debug("JujuApi: instantiated") self.server = server self.port = port self.secret = secret - if user.startswith('user-'): + if user.startswith("user-"): self.user = user else: - self.user = 'user-{}'.format(user) + self.user = "user-{}".format(user) - self.endpoint = '%s:%d' % (server, int(port)) + self.endpoint = "%s:%d" % (server, int(port)) self.artifacts = artifacts @@ -307,31 +304,33 @@ class N2VC: """ # Make sure that we have a public key before writing to disk if public_key is None or len(public_key) == 0: - if 'OSM_VCA_PUBKEY' in os.environ: - public_key = os.getenv('OSM_VCA_PUBKEY', '') + if "OSM_VCA_PUBKEY" in os.environ: + public_key = os.getenv("OSM_VCA_PUBKEY", "") if len(public_key == 0): return else: return - path = "{}/.local/share/juju/ssh".format( - os.path.expanduser('~'), - ) + path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),) if not os.path.exists(path): os.makedirs(path) - with open('{}/juju_id_rsa.pub'.format(path), 'w') as f: + with open("{}/juju_id_rsa.pub".format(path), "w") as f: f.write(public_key) - def notify_callback(self, model_name, application_name, status, message, - callback=None, *callback_args): + def notify_callback( + self, + model_name, + application_name, + status, + message, + callback=None, + *callback_args + ): try: if callback: callback( - model_name, - application_name, - status, message, - *callback_args, + model_name, application_name, status, message, *callback_args, ) except Exception as e: self.log.error("[0] notify_callback exception {}".format(e)) @@ -342,7 +341,8 @@ class N2VC: async def Relate(self, model_name, vnfd): """Create a relation between the charm-enabled VDUs in a VNF. - The Relation mapping has two parts: the id of the vdu owning the endpoint, and the name of the endpoint. + The Relation mapping has two parts: the id of the vdu owning the endpoint, and + the name of the endpoint. vdu: ... @@ -351,7 +351,9 @@ class N2VC: - provides: dataVM:db requires: mgmtVM:app - This tells N2VC that the charm referred to by the dataVM vdu offers a relation named 'db', and the mgmtVM vdu has an 'app' endpoint that should be connected to a database. + This tells N2VC that the charm referred to by the dataVM vdu offers a relation + named 'db', and the mgmtVM vdu + has an 'app' endpoint that should be connected to a database. :param str ns_name: The name of the network service. :param dict vnfd: The parsed yaml VNF descriptor. @@ -366,29 +368,27 @@ class N2VC: configs = [] vnf_config = vnfd.get("vnf-configuration") if vnf_config: - juju = vnf_config['juju'] + juju = vnf_config["juju"] if juju: configs.append(vnf_config) - for vdu in vnfd['vdu']: - vdu_config = vdu.get('vdu-configuration') + for vdu in vnfd["vdu"]: + vdu_config = vdu.get("vdu-configuration") if vdu_config: - juju = vdu_config['juju'] + juju = vdu_config["juju"] if juju: configs.append(vdu_config) def _get_application_name(name): """Get the application name that's mapped to a vnf/vdu.""" vnf_member_index = 0 - vnf_name = vnfd['name'] + vnf_name = vnfd["name"] - for vdu in vnfd.get('vdu'): + for vdu in vnfd.get("vdu"): # Compare the named portion of the relation to the vdu's id - if vdu['id'] == name: + if vdu["id"] == name: application_name = self.FormatApplicationName( - model_name, - vnf_name, - str(vnf_member_index), + model_name, vnf_name, str(vnf_member_index), ) return application_name else: @@ -398,46 +398,48 @@ class N2VC: # Loop through relations for cfg in configs: - if 'juju' in cfg: - juju = cfg['juju'] - if 'vca-relationships' in juju and 'relation' in juju['vca-relationships']: - for rel in juju['vca-relationships']['relation']: + if "juju" in cfg: + juju = cfg["juju"] + if ( + "vca-relationships" in juju + and "relation" in juju["vca-relationships"] + ): + for rel in juju["vca-relationships"]["relation"]: try: # get the application name for the provides - (name, endpoint) = rel['provides'].split(':') + (name, endpoint) = rel["provides"].split(":") application_name = _get_application_name(name) - provides = "{}:{}".format( - application_name, - endpoint - ) + provides = "{}:{}".format(application_name, endpoint) # get the application name for thr requires - (name, endpoint) = rel['requires'].split(':') + (name, endpoint) = rel["requires"].split(":") application_name = _get_application_name(name) - requires = "{}:{}".format( - application_name, - endpoint + requires = "{}:{}".format(application_name, endpoint) + self.log.debug( + "Relation: {} <-> {}".format(provides, requires) ) - self.log.debug("Relation: {} <-> {}".format( - provides, - requires - )) await self.add_relation( - model_name, - provides, - requires, + model_name, provides, requires, ) except Exception as e: self.log.debug("Exception: {}".format(e)) return - async def DeployCharms(self, model_name, application_name, vnfd, - charm_path, params={}, machine_spec={}, - callback=None, *callback_args): + async def DeployCharms( + self, + model_name, + application_name, + vnfd, + charm_path, + params={}, + machine_spec={}, + callback=None, + *callback_args + ): """Deploy one or more charms associated with a VNF. Deploy the charm(s) referenced in a VNF Descriptor. @@ -452,7 +454,8 @@ class N2VC: 'rw_mgmt_ip': '1.2.3.4', # Pass the initial-config-primitives section of the vnf or vdu 'initial-config-primitives': {...} - 'user_values': dictionary with the day-1 parameters provided at instantiation time. It will replace values + 'user_values': dictionary with the day-1 parameters provided at + instantiation time. It will replace values inside < >. rw_mgmt_ip will be included here also } :param dict machine_spec: A dictionary describing the machine to @@ -499,15 +502,20 @@ class N2VC: ######################################## app = await self.get_application(model, application_name) if app: - raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model_name)) + raise JujuApplicationExists( + ( + 'Can\'t deploy application "{}" to model ' + ' "{}" because it already exists.' + ).format(application_name, model_name) + ) ################################################################ # Register this application with the model-level event monitor # ################################################################ if callback: - self.log.debug("JujuApi: Registering callback for {}".format( - application_name, - )) + self.log.debug( + "JujuApi: Registering callback for {}".format(application_name,) + ) await self.Subscribe(model_name, application_name, callback, *callback_args) ####################################### @@ -515,15 +523,14 @@ class N2VC: ####################################### rw_mgmt_ip = None - if 'rw_mgmt_ip' in params: - rw_mgmt_ip = params['rw_mgmt_ip'] + if "rw_mgmt_ip" in params: + rw_mgmt_ip = params["rw_mgmt_ip"] - if 'initial-config-primitive' not in params: - params['initial-config-primitive'] = {} + if "initial-config-primitive" not in params: + params["initial-config-primitive"] = {} initial_config = self._get_config_from_dict( - params['initial-config-primitive'], - {'': rw_mgmt_ip} + params["initial-config-primitive"], {"": rw_mgmt_ip} ) ######################################################## @@ -533,15 +540,16 @@ class N2VC: series = "xenial" if machine_spec.keys(): - if all(k in machine_spec for k in ['hostname', 'username']): + if all(k in machine_spec for k in ["hostname", "username"]): # Allow series to be derived from the native charm series = None - self.log.debug("Provisioning manual machine {}@{}".format( - machine_spec['username'], - machine_spec['hostname'], - )) + self.log.debug( + "Provisioning manual machine {}@{}".format( + machine_spec["username"], machine_spec["hostname"], + ) + ) """Native Charm support @@ -557,15 +565,16 @@ class N2VC: to = await self.provision_machine( model_name=model_name, - username=machine_spec['username'], - hostname=machine_spec['hostname'], + username=machine_spec["username"], + hostname=machine_spec["hostname"], private_key_path=self.GetPrivateKeyPath(), ) self.log.debug("Provisioned machine id {}".format(to)) # TODO: If to is none, raise an exception - # The native charm won't have the sshproxy layer, typically, but LCM uses the config primitive + # The native charm won't have the sshproxy layer, typically, but LCM + # uses the config primitive # to interpret what the values are. That's a gap to fill. """ @@ -583,17 +592,16 @@ class N2VC: # Native charms don't include the ssh-* config values, so strip them # from the initial_config, otherwise the deploy will raise an error. # self.log.debug("Removing ssh-* from initial-config") - for k in ['ssh-hostname', 'ssh-username', 'ssh-password']: + for k in ["ssh-hostname", "ssh-username", "ssh-password"]: if k in initial_config: self.log.debug("Removing parameter {}".format(k)) del initial_config[k] - self.log.debug("JujuApi: Deploying charm ({}/{}) from {} to {}".format( - model_name, - application_name, - charm_path, - to, - )) + self.log.debug( + "JujuApi: Deploying charm ({}/{}) from {} to {}".format( + model_name, application_name, charm_path, to, + ) + ) ######################################################## # Deploy the charm and apply the initial configuration # @@ -621,7 +629,7 @@ class N2VC: except KeyError as ex: # We don't currently support relations between NS and VNF/VDU charms self.log.warn("[N2VC] Relations not supported: {}".format(ex)) - except Exception as ex: + except Exception: # This may happen if not all of the charms needed by the relation # are ready. We can safely ignore this, because Relate will be # retried when the endpoint of the relation is deployed. @@ -631,9 +639,7 @@ class N2VC: # # Execute initial config primitive(s) # # ####################################### uuids = await self.ExecuteInitialPrimitives( - model_name, - application_name, - params, + model_name, application_name, params, ) return uuids @@ -752,7 +758,7 @@ class N2VC: # raise N2VCPrimitiveExecutionFailed(e) def GetPrivateKeyPath(self): - homedir = os.environ['HOME'] + homedir = os.environ["HOME"] sshdir = "{}/.ssh".format(homedir) private_key_path = "{}/id_n2vc_rsa".format(sshdir) return private_key_path @@ -768,10 +774,10 @@ class N2VC: Juju, after which Juju will communicate with the VM directly via the juju agent. """ - public_key = "" + # public_key = "" # Find the path to where we expect our key to live. - homedir = os.environ['HOME'] + homedir = os.environ["HOME"] sshdir = "{}/.ssh".format(homedir) if not os.path.exists(sshdir): os.mkdir(sshdir) @@ -782,9 +788,7 @@ class N2VC: # If we don't have a key generated, generate it. if not os.path.exists(private_key_path): cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format( - "rsa", - "4096", - private_key_path + "rsa", "4096", private_key_path ) subprocess.check_output(shlex.split(cmd)) @@ -794,8 +798,9 @@ class N2VC: return public_key - async def ExecuteInitialPrimitives(self, model_name, application_name, - params, callback=None, *callback_args): + async def ExecuteInitialPrimitives( + self, model_name, application_name, params, callback=None, *callback_args + ): """Execute multiple primitives. Execute multiple primitives as declared in initial-config-primitive. @@ -807,59 +812,71 @@ class N2VC: primitives = {} # Build a sequential list of the primitives to execute - for primitive in params['initial-config-primitive']: + for primitive in params["initial-config-primitive"]: try: - if primitive['name'] == 'config': + if primitive["name"] == "config": pass else: - seq = primitive['seq'] + seq = primitive["seq"] params_ = {} - if 'parameter' in primitive: - params_ = primitive['parameter'] + if "parameter" in primitive: + params_ = primitive["parameter"] user_values = params.get("user_values", {}) - if 'rw_mgmt_ip' not in user_values: - user_values['rw_mgmt_ip'] = None - # just for backward compatibility, because it will be provided always by modern version of LCM + if "rw_mgmt_ip" not in user_values: + user_values["rw_mgmt_ip"] = None + # just for backward compatibility, because it will be provided + # always by modern version of LCM primitives[seq] = { - 'name': primitive['name'], - 'parameters': self._map_primitive_parameters( - params_, - user_values + "name": primitive["name"], + "parameters": self._map_primitive_parameters( + params_, user_values ), } for primitive in sorted(primitives): try: - # self.log.debug("Queuing action {}".format(primitives[primitive]['name'])) + # self.log.debug("Queuing action {}".format( + # primitives[primitive]['name'])) uuids.append( await self.ExecutePrimitive( model_name, application_name, - primitives[primitive]['name'], + primitives[primitive]["name"], callback, callback_args, - **primitives[primitive]['parameters'], + **primitives[primitive]["parameters"], ) ) except PrimitiveDoesNotExist as e: - self.log.debug("Ignoring exception PrimitiveDoesNotExist: {}".format(e)) + self.log.debug( + "Ignoring exception PrimitiveDoesNotExist: {}".format(e) + ) pass except Exception as e: - self.log.debug("XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}".format(e)) + self.log.debug( + ( + "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}" + ).format(e) + ) raise e except N2VCPrimitiveExecutionFailed as e: - self.log.debug( - "[N2VC] Exception executing primitive: {}".format(e) - ) + self.log.debug("[N2VC] Exception executing primitive: {}".format(e)) raise return uuids - async def ExecutePrimitive(self, model_name, application_name, primitive, - callback, *callback_args, **params): + async def ExecutePrimitive( + self, + model_name, + application_name, + primitive, + callback, + *callback_args, + **params + ): """Execute a primitive of a charm for Day 1 or Day 2 configuration. Execute a primitive defined in the VNF descriptor. @@ -887,12 +904,10 @@ class N2VC: model = await self.get_model(model_name) - if primitive == 'config': + if primitive == "config": # config is special, and expecting params to be a dictionary await self.set_config( - model, - application_name, - params['params'], + model, application_name, params["params"], ) else: app = await self.get_application(model, application_name) @@ -901,7 +916,9 @@ class N2VC: actions = await app.get_actions() if primitive not in actions.keys(): - raise PrimitiveDoesNotExist("Primitive {} does not exist".format(primitive)) + raise PrimitiveDoesNotExist( + "Primitive {} does not exist".format(primitive) + ) # Run against the first (and probably only) unit in the app unit = app.units[0] @@ -913,14 +930,13 @@ class N2VC: raise e except Exception as e: # An unexpected exception was caught - self.log.debug( - "Caught exception while executing primitive: {}".format(e) - ) + self.log.debug("Caught exception while executing primitive: {}".format(e)) raise N2VCPrimitiveExecutionFailed(e) return uuid - async def RemoveCharms(self, model_name, application_name, callback=None, - *callback_args): + async def RemoveCharms( + self, model_name, application_name, callback=None, *callback_args + ): """Remove a charm from the VCA. Remove a charm referenced in a VNF Descriptor. @@ -941,10 +957,9 @@ class N2VC: # Remove this application from event monitoring await self.Unsubscribe(model_name, application_name) - # self.notify_callback(model_name, application_name, "removing", callback, *callback_args) - self.log.debug( - "Removing the application {}".format(application_name) - ) + # self.notify_callback(model_name, application_name, "removing", + # callback, *callback_args) + self.log.debug("Removing the application {}".format(application_name)) await app.remove() # await self.disconnect_model(self.monitors[model_name]) @@ -1018,19 +1033,21 @@ class N2VC: try: await model.block_until( lambda: all( - unit.workload_status in ['terminated'] for unit in app.units + unit.workload_status in ["terminated"] for unit in app.units ), - timeout=timeout + timeout=timeout, + ) + except Exception: + self.log.debug( + "Timed out waiting for {} to terminate.".format(application) ) - except Exception as e: - self.log.debug("Timed out waiting for {} to terminate.".format(application)) for machine in model.machines: try: self.log.debug("Destroying machine {}".format(machine)) await model.machines[machine].destroy(force=True) except JujuAPIError as e: - if 'does not exist' in str(e): + if "does not exist" in str(e): # Our cached model may be stale, because the machine # has already been removed. It's safe to continue. continue @@ -1085,9 +1102,7 @@ class N2VC: the callback method """ self.monitors[ns_name].AddApplication( - application_name, - callback, - *callback_args + application_name, callback, *callback_args ) async def Unsubscribe(self, ns_name, application_name): @@ -1098,9 +1113,7 @@ class N2VC: :param ns_name str: The name of the Network Service :param application_name str: The name of the application """ - self.monitors[ns_name].RemoveApplication( - application_name, - ) + self.monitors[ns_name].RemoveApplication(application_name,) # Non-public methods async def add_relation(self, model_name, relation1, relation2): @@ -1122,9 +1135,9 @@ class N2VC: # If one of the applications in the relationship doesn't exist, # or the relation has already been added, let the operation fail # silently. - if 'not found' in e.message: + if "not found" in e.message: return - if 'already exists' in e.message: + if "already exists" in e.message: return raise e @@ -1147,42 +1160,45 @@ class N2VC: """ config = {} for primitive in config_primitive: - if primitive['name'] == 'config': + if primitive["name"] == "config": # config = self._map_primitive_parameters() - for parameter in primitive['parameter']: - param = str(parameter['name']) - if parameter['value'] == "": - config[param] = str(values[parameter['value']]) + for parameter in primitive["parameter"]: + param = str(parameter["name"]) + if parameter["value"] == "": + config[param] = str(values[parameter["value"]]) else: - config[param] = str(parameter['value']) + config[param] = str(parameter["value"]) return config def _map_primitive_parameters(self, parameters, user_values): params = {} for parameter in parameters: - param = str(parameter['name']) - value = parameter.get('value') + param = str(parameter["name"]) + value = parameter.get("value") - # map parameters inside a < >; e.g. . with the provided user_values. + # map parameters inside a < >; e.g. . with the provided user + # _values. # Must exist at user_values except if there is a default value if isinstance(value, str) and value.startswith("<") and value.endswith(">"): - if parameter['value'][1:-1] in user_values: - value = user_values[parameter['value'][1:-1]] - elif 'default-value' in parameter: - value = parameter['default-value'] + if parameter["value"][1:-1] in user_values: + value = user_values[parameter["value"][1:-1]] + elif "default-value" in parameter: + value = parameter["default-value"] else: - raise KeyError("parameter {}='{}' not supplied ".format(param, value)) + raise KeyError( + "parameter {}='{}' not supplied ".format(param, value) + ) # If there's no value, use the default-value (if set) - if value is None and 'default-value' in parameter: - value = parameter['default-value'] + if value is None and "default-value" in parameter: + value = parameter["default-value"] # Typecast parameter value, if present paramtype = "string" try: - if 'data-type' in parameter: - paramtype = str(parameter['data-type']).lower() + if "data-type" in parameter: + paramtype = str(parameter["data-type"]).lower() if paramtype == "integer": value = int(value) @@ -1194,7 +1210,11 @@ class N2VC: # If there's no data-type, assume the value is a string value = str(value) except ValueError: - raise ValueError("parameter {}='{}' cannot be converted to type {}".format(param, value, paramtype)) + raise ValueError( + "parameter {}='{}' cannot be converted to type {}".format( + param, value, paramtype + ) + ) params[param] = value return params @@ -1203,13 +1223,13 @@ class N2VC: """Transform the yang config primitive to dict.""" config = {} for primitive in config_primitive.values(): - if primitive['name'] == 'config': - for parameter in primitive['parameter'].values(): - param = str(parameter['name']) - if parameter['value'] == "": - config[param] = str(values[parameter['value']]) + if primitive["name"] == "config": + for parameter in primitive["parameter"].values(): + param = str(parameter["name"]) + if parameter["value"] == "": + config[param] = str(values[parameter["value"]]) else: - config[param] = str(parameter['value']) + config[param] = str(parameter["value"]) return config @@ -1235,7 +1255,7 @@ class N2VC: elif not c.isalpha(): c = "-" appname += c - return re.sub('-+', '-', appname.lower()) + return re.sub("-+", "-", appname.lower()) # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0): # """Format the name of the application @@ -1260,7 +1280,7 @@ class N2VC: Model names may only contain lowercase letters, digits and hyphens """ - return name.replace('_', '-').lower() + return name.replace("_", "-").lower() async def get_application(self, model, application): """Get the deployed application.""" @@ -1290,19 +1310,15 @@ class N2VC: if model_name not in models: try: self.models[model_name] = await self.controller.add_model( - model_name, - config={'authorized-keys': self.juju_public_key} - + model_name, config={"authorized-keys": self.juju_public_key} ) except JujuError as e: if "already exists" not in e.message: raise e else: - self.models[model_name] = await self.controller.get_model( - model_name - ) + self.models[model_name] = await self.controller.get_model(model_name) - self.refcount['model'] += 1 + self.refcount["model"] += 1 # Create an observer for this model await self.create_model_monitor(model_name) @@ -1335,9 +1351,7 @@ class N2VC: if self.secret: self.log.debug( "Connecting to controller... ws://{} as {}/{}".format( - self.endpoint, - self.user, - self.secret, + self.endpoint, self.user, self.secret, ) ) try: @@ -1347,7 +1361,7 @@ class N2VC: password=self.secret, cacert=self.ca_cert, ) - self.refcount['controller'] += 1 + self.refcount["controller"] += 1 self.authenticated = True self.log.debug("JujuApi: Logged into controller") except Exception as ex: @@ -1364,7 +1378,6 @@ class N2VC: self.log.fatal("VCA credentials not configured.") self.authenticated = False - async def logout(self): """Logout of the Juju controller.""" if not self.authenticated: @@ -1375,11 +1388,9 @@ class N2VC: await self.disconnect_model(model) if self.controller: - self.log.debug("Disconnecting controller {}".format( - self.controller - )) + self.log.debug("Disconnecting controller {}".format(self.controller)) await self.controller.disconnect() - self.refcount['controller'] -= 1 + self.refcount["controller"] -= 1 self.controller = None self.authenticated = False @@ -1387,9 +1398,7 @@ class N2VC: self.log.debug(self.refcount) except Exception as e: - self.log.fatal( - "Fatal error logging out of Juju Controller: {}".format(e) - ) + self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e)) raise e return True @@ -1398,14 +1407,14 @@ class N2VC: if model in self.models: try: await self.models[model].disconnect() - self.refcount['model'] -= 1 + self.refcount["model"] -= 1 self.models[model] = None except Exception as e: self.log.debug("Caught exception: {}".format(e)) - async def provision_machine(self, model_name: str, - hostname: str, username: str, - private_key_path: str) -> int: + async def provision_machine( + self, model_name: str, hostname: str, username: str, private_key_path: str + ) -> int: """Provision a machine. This executes the SSH provisioner, which will log in to a machine via @@ -1414,8 +1423,10 @@ class N2VC: :param model_name str: The name of the model :param hostname str: The IP or hostname of the target VM :param user str: The username to login to - :param private_key_path str: The path to the private key that's been injected to the VM via cloud-init - :return machine_id int: Returns the id of the machine or None if provisioning fails + :param private_key_path str: The path to the private key that's been injected + to the VM via cloud-init + :return machine_id int: Returns the id of the machine or None if provisioning + fails """ if not self.authenticated: await self.login() @@ -1423,11 +1434,11 @@ class N2VC: machine_id = None if self.api_proxy: - self.log.debug("Instantiating SSH Provisioner for {}@{} ({})".format( - username, - hostname, - private_key_path - )) + self.log.debug( + "Instantiating SSH Provisioner for {}@{} ({})".format( + username, hostname, private_key_path + ) + ) provisioner = SSHProvisioner( host=hostname, user=username, @@ -1443,7 +1454,7 @@ class N2VC: return None if params: - params.jobs = ['JobHostUnits'] + params.jobs = ["JobHostUnits"] model = await self.get_model(model_name) @@ -1463,10 +1474,7 @@ class N2VC: # as we need the machine_id self.log.debug("Installing Juju agent") await provisioner.install_agent( - connection, - params.nonce, - machine_id, - self.api_proxy, + connection, params.nonce, machine_id, self.api_proxy, ) else: self.log.debug("Missing API Proxy") @@ -1495,11 +1503,11 @@ class N2VC: if not self.authenticated: await self.login() - m = await self.get_model() - try: - m.remove_relation(a, b) - finally: - await m.disconnect() + # m = await self.get_model() + # try: + # m.remove_relation(a, b) + # finally: + # await m.disconnect() async def resolve_error(self, model_name, application=None): """Resolve units in error state.""" @@ -1511,25 +1519,17 @@ class N2VC: app = await self.get_application(model, application) if app: self.log.debug( - "JujuApi: Resolving errors for application {}".format( - application, - ) + "JujuApi: Resolving errors for application {}".format(application,) ) - for unit in app.units: + for _ in app.units: app.resolved(retry=True) async def run_action(self, model_name, application, action_name, **params): """Execute an action and return an Action object.""" if not self.authenticated: await self.login() - result = { - 'status': '', - 'action': { - 'tag': None, - 'results': None, - } - } + result = {"status": "", "action": {"tag": None, "results": None}} model = await self.get_model(model_name) @@ -1541,8 +1541,7 @@ class N2VC: self.log.debug( "JujuApi: Running Action {} against Application {}".format( - action_name, - application, + action_name, application, ) ) @@ -1551,9 +1550,9 @@ class N2VC: # Wait for the action to complete await action.wait() - result['status'] = action.status - result['action']['tag'] = action.data['id'] - result['action']['results'] = action.results + result["status"] = action.status + result["action"]["tag"] = action.data["id"] + result["action"]["results"] = action.results return result @@ -1564,16 +1563,20 @@ class N2VC: app = await self.get_application(model_name, application) if app: - self.log.debug("JujuApi: Setting config for Application {}".format( - application, - )) + self.log.debug( + "JujuApi: Setting config for Application {}".format(application,) + ) await app.set_config(config) # Verify the config is set newconf = await app.get_config() for key in config: - if config[key] != newconf[key]['value']: - self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key])) + if config[key] != newconf[key]["value"]: + self.log.debug( + ( + "JujuApi: Config not set! Key {} Value {} doesn't match {}" + ).format(key, config[key], newconf[key]) + ) # async def set_parameter(self, parameter, value, application=None): # """Set a config parameter for a service.""" @@ -1588,10 +1591,9 @@ class N2VC: # return await self.apply_config( # {parameter: value}, # application=application, - # ) + # ) - async def wait_for_application(self, model_name, application_name, - timeout=300): + async def wait_for_application(self, model_name, application_name, timeout=300): """Wait for an application to become active.""" if not self.authenticated: await self.login() @@ -1603,15 +1605,15 @@ class N2VC: if app: self.log.debug( "JujuApi: Waiting {} seconds for Application {}".format( - timeout, - application_name, + timeout, application_name, ) ) await model.block_until( lambda: all( - unit.agent_status == 'idle' and unit.workload_status in - ['active', 'unknown'] for unit in app.units + unit.agent_status == "idle" + and unit.workload_status in ["active", "unknown"] + for unit in app.units ), - timeout=timeout + timeout=timeout, ) diff --git a/requirements.txt b/requirements.txt index 9c558e3..a71d369 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,18 @@ -. +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common +juju +paramiko +pyasn1>=0.4.4 \ No newline at end of file diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..6cafdc8 --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,18 @@ +# Copyright 2020 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +flake8<3.0 +mock +requests-mock +coverage==4.5.3 diff --git a/tox.ini b/tox.ini index 4685666..bd89241 100644 --- a/tox.ini +++ b/tox.ini @@ -12,66 +12,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Tox (http://tox.testrun.org/) is a tool for running tests -# in multiple virtualenvs. This configuration file will run the -# test suite on all supported python versions. To use it, "pip install tox" -# and then run "tox" from this directory. - [tox] -envlist = py3,lint,integration -skipsdist=True - -[pytest] -markers = - serial: mark a test that must run by itself +envlist = cover, flake8, pylint [testenv] -basepython=python3 -usedevelop=True -# for testing with other python versions -commands = py.test --ignore modules/ --ignore tests/charms/ --tb native -ra -v -s -n auto -k 'not integration' -m 'not serial' {posargs} -passenv = - HOME - VCA_PATH - VCA_HOST - VCA_PORT - VCA_USER - VCA_SECRET - LXD_HOST - LXD_SECRET - VCA_CACERT - # These are needed so executing `charm build` succeeds - TERM - TERMINFO +[testenv:cover] +basepython = python3 deps = - mock - pyyaml - pytest - pytest-asyncio - pytest-xdist - pytest-assume - paramiko - pylxd + nose2 + -rrequirements.txt + -rtest-requirements.txt +commands = + coverage erase + nose2 -C --coverage n2vc --plugin nose2.plugins.junitxml -s n2vc + coverage report --omit='*tests*' + coverage html -d ./cover --omit='*tests*' + coverage xml -o coverage.xml --omit='*tests*' -[testenv:py3] -# default tox env, excludes integration and serial tests +[testenv:pylint] +basepython = python3 +deps = + pylint + -rrequirements.txt commands = - pytest --ignore modules/ --ignore tests/charms/ --tb native -ra -v -n auto -k 'not integration' -m 'not serial' {posargs} + pylint -E n2vc -[testenv:lint] -envdir = {toxworkdir}/py3 +[testenv:black] +basepython = python3 +deps = + black commands = - flake8 --ignore E501,E402 --exclude tests/charms/builds,tests/charms/deps {posargs} n2vc tests + black --check --diff n2vc + +[testenv:flake8] +basepython = python3 deps = - flake8 + flake8 + -rrequirements.txt +commands = + flake8 n2vc -[testenv:integration] -envdir = {toxworkdir}/py3 -commands = py.test --ignore modules/ --ignore tests/charms/ --tb native -ra -v -s -n 1 -k 'integration' -m 'serial' {posargs} +[flake8] +# W503 is invalid PEP-8 +max-line-length = 88 +show-source = True +ignore = W503,E203 +exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst [testenv:build] deps = - stdeb - setuptools-version-command + stdeb + setuptools-version-command commands = python3 setup.py --command-packages=stdeb.command bdist_deb -- 2.17.1