Merge "Add missing argument in notify_callback"
authorisraelad <adam.israel@canonical.com>
Tue, 3 Dec 2019 20:51:18 +0000 (21:51 +0100)
committerGerrit Code Review <root@osm.etsi.org>
Tue, 3 Dec 2019 20:51:18 +0000 (21:51 +0100)
n2vc/exceptions.py
n2vc/juju_observer.py
n2vc/k8s_conn.py
n2vc/k8s_helm_conn.py
n2vc/k8s_juju_conn.py
n2vc/loggable.py
n2vc/n2vc_conn.py
n2vc/n2vc_juju_conn.py
tests/test_k8s_juju_conn.py

index 35962b2..c8e8793 100644 (file)
@@ -40,10 +40,6 @@ class AuthenticationFailed(Exception):
     """The authentication for the specified user failed."""
 
 
-class InvalidCACertificate(Exception):
-    """The CA Certificate is not valid."""
-
-
 class NotImplemented(Exception):
     """The method is not implemented."""
 
@@ -126,4 +122,3 @@ class N2VCInvalidCertificate(N2VCException):
 
     def __str__(self):
         return '<{}> Invalid certificate: {}'.format(type(self), super().__str__())
-
index ac40f34..f4102a4 100644 (file)
@@ -52,7 +52,12 @@ class JujuModelObserver(ModelObserver):
         self.actions = dict()
 
     def register_machine(self, machine: Machine, db_dict: dict):
-        entity_id = machine.entity_id
+        try:
+            entity_id = machine.entity_id
+        except:
+            # no entity_id aatribute, try machine attribute
+            entity_id = machine.machine
+        self.n2vc.debug(msg='Registering machine for changes notifications: {}'.format(entity_id))
         entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
         self.machines[entity_id] = entity
 
@@ -65,6 +70,7 @@ class JujuModelObserver(ModelObserver):
 
     def register_application(self, application: Application, db_dict: dict):
         entity_id = application.entity_id
+        self.n2vc.debug(msg='Registering application for changes notifications: {}'.format(entity_id))
         entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
         self.applications[entity_id] = entity
 
@@ -77,6 +83,7 @@ class JujuModelObserver(ModelObserver):
 
     def register_action(self, action: Action, db_dict: dict):
         entity_id = action.entity_id
+        self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
         entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
         self.actions[entity_id] = entity
 
index d951c2c..11fb43e 100644 (file)
@@ -63,7 +63,7 @@ class K8sConnector(abc.ABC, Loggable):
             self,
             k8s_creds: str,
             namespace: str = 'kube-system',
-            reuse_cluster_uuid = None
+            reuse_cluster_uuid=None
     ) -> (str, bool):
         """
         It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
@@ -237,30 +237,33 @@ class K8sConnector(abc.ABC, Loggable):
     @abc.abstractmethod
     async def inspect_kdu(
             self,
-            kdu_model: str
+            kdu_model: str,
+            repo_url: str = None
     ) -> str:
         """
-        These calls will retrieve from the Charm/Bundle:
+        These calls will retrieve from the Chart/Bundle:
 
             - The list of configurable values and their defaults (e.g. in Charts, it would retrieve
                 the contents of `values.yaml`).
             - If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
 
-        :param cluster_uuid: the cluster to get the information
         :param kdu_model: chart/bundle reference
-        :return: If successful, it will return a dictionary containing the list of available parameters
-            and their default values
+        :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL)
+        :return:
+
+        If successful, it will return the available parameters and their default values as provided by the backend.
         """
 
     @abc.abstractmethod
     async def help_kdu(
             self,
-            kdu_model: str
+            kdu_model: str,
+            repo_url: str = None
     ) -> str:
         """
 
-        :param cluster_uuid: the cluster to get the information
         :param kdu_model: chart/bundle reference
+        :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL)
         :return: If successful, it will return the contents of the 'readme.md'
         """
 
index eac34d4..2b15d76 100644 (file)
@@ -83,6 +83,16 @@ class K8sHelmConnector(K8sConnector):
         self._helm_command = helm_command
         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
 
+        # initialize helm client-only
+        self.debug('Initializing helm client-only...')
+        command = '{} init --client-only'.format(self._helm_command)
+        try:
+            asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
+            # loop = asyncio.get_event_loop()
+            # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
+        except Exception as e:
+            self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
+
         self.info('K8S Helm connector initialized')
 
     async def init_env(
@@ -186,7 +196,8 @@ class K8sHelmConnector(K8sConnector):
         kube_dir, helm_dir, config_filename, cluster_dir = \
             self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
 
-        command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+        command = '{} --kubeconfig={} --home={} repo list --output yaml'\
+            .format(self._helm_command, config_filename, helm_dir)
 
         output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
         if output and len(output) > 0:
@@ -310,9 +321,6 @@ class K8sHelmConnector(K8sConnector):
 
         self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
 
-        start = time.time()
-        end = start + timeout
-
         # config filename
         kube_dir, helm_dir, config_filename, cluster_dir = \
             self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
@@ -351,7 +359,7 @@ class K8sHelmConnector(K8sConnector):
                 if result is not None:
                     # instance already exists: generate a new one
                     kdu_instance = None
-            except:
+            except Exception as e:
                 kdu_instance = None
 
         # helm repo install
@@ -450,9 +458,6 @@ class K8sHelmConnector(K8sConnector):
 
         self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
 
-        start = time.time()
-        end = start + timeout
-
         # config filename
         kube_dir, helm_dir, config_filename, cluster_dir = \
             self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
@@ -502,7 +507,7 @@ class K8sHelmConnector(K8sConnector):
             )
 
             # wait for execution task
-            await asyncio.wait([ exec_task ])
+            await asyncio.wait([exec_task])
 
             # cancel status task
             status_task.cancel()
@@ -634,40 +639,47 @@ class K8sHelmConnector(K8sConnector):
 
     async def inspect_kdu(
             self,
-            kdu_model: str
+            kdu_model: str,
+            repo_url: str = None
     ) -> str:
 
-        self.debug('inspect kdu_model {}'.format(kdu_model))
+        self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
+
+        return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
 
-        command = '{} inspect values {}'\
-            .format(self._helm_command, kdu_model)
+    async def values_kdu(
+            self,
+            kdu_model: str,
+            repo_url: str = None
+    ) -> str:
 
-        output, rc = await self._local_async_exec(command=command)
+        self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
 
-        return output
+        return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
 
     async def help_kdu(
             self,
-            kdu_model: str
-    ):
+            kdu_model: str,
+            repo_url: str = None
+    ) -> str:
 
-        self.debug('help kdu_model {}'.format(kdu_model))
+        self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
 
-        command = '{} inspect readme {}'\
-            .format(self._helm_command, kdu_model)
-
-        output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
-        return output
+        return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
 
     async def status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str
-    ):
-
-        return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+    ) -> str:
 
+        # call internal function
+        return await self._status_kdu(
+            cluster_uuid=cluster_uuid,
+            kdu_instance=kdu_instance,
+            show_error_log=True,
+            return_text=True
+        )
 
     """
     ##################################################################################################
@@ -675,11 +687,32 @@ class K8sHelmConnector(K8sConnector):
     ##################################################################################################
     """
 
+    async def _exec_inspect_comand(
+            self,
+            inspect_command: str,
+            kdu_model: str,
+            repo_url: str = None
+    ):
+
+        repo_str = ''
+        if repo_url:
+            repo_str = ' --repo {}'.format(repo_url)
+            idx = kdu_model.find('/')
+            if idx >= 0:
+                idx += 1
+                kdu_model = kdu_model[idx:]
+
+        inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
+        output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
+
+        return output
+
     async def _status_kdu(
             self,
             cluster_uuid: str,
             kdu_instance: str,
-            show_error_log: bool = False
+            show_error_log: bool = False,
+            return_text: bool = False
     ):
 
         self.debug('status of kdu_instance {}'.format(kdu_instance))
@@ -697,6 +730,9 @@ class K8sHelmConnector(K8sConnector):
             show_error_log=show_error_log
         )
 
+        if return_text:
+            return str(output)
+
         if rc != 0:
             return None
 
@@ -800,7 +836,7 @@ class K8sHelmConnector(K8sConnector):
             kdu_instance: str
     ) -> bool:
 
-        status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+        status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
 
         # extract info.status.resources-> str
         # format:
@@ -877,7 +913,6 @@ class K8sHelmConnector(K8sConnector):
     # params for use in -f file
     # returns values file option and filename (in order to delete it at the end)
     def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
-        params_str = ''
 
         if params and len(params) > 0:
             kube_dir, helm_dir, config_filename, cluster_dir = \
@@ -894,7 +929,7 @@ class K8sHelmConnector(K8sConnector):
             for key in params:
                 value = params.get(key)
                 if '!!yaml' in str(value):
-                    value = yaml.safe_load(value[7:])
+                    value = yaml.load(value[7:])
                 params2[key] = value
 
             values_file = get_random_number() + '.yaml'
@@ -1022,7 +1057,8 @@ class K8sHelmConnector(K8sConnector):
             self,
             command: str,
             raise_exception_on_error: bool = False,
-            show_error_log: bool = True
+            show_error_log: bool = True,
+            encode_utf8: bool = False
     ) -> (str, int):
 
         command = K8sHelmConnector._remove_multiple_spaces(command)
@@ -1046,8 +1082,10 @@ class K8sHelmConnector(K8sConnector):
             output = ''
             if stdout:
                 output = stdout.decode('utf-8').strip()
+                # output = stdout.decode()
             if stderr:
                 output = stderr.decode('utf-8').strip()
+                # output = stderr.decode()
 
             if return_code != 0 and show_error_log:
                 self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
@@ -1057,6 +1095,10 @@ class K8sHelmConnector(K8sConnector):
             if raise_exception_on_error and return_code != 0:
                 raise Exception(output)
 
+            if encode_utf8:
+                output = output.encode('utf-8').strip()
+                output = str(output).replace('\\n', '\n')
+
             return output, return_code
 
         except Exception as e:
@@ -1102,4 +1144,3 @@ class K8sHelmConnector(K8sConnector):
             if exception_if_not_exists:
                 self.error(msg)
                 raise Exception(msg)
-
index 4f62898..1f7cc00 100644 (file)
@@ -36,11 +36,15 @@ import yaml
 
 
 class K8sJujuConnector(K8sConnector):
+
     def __init__(
             self,
-            fs,
-            kubectl_command='/usr/bin/kubectl',
-            log=None
+            fs: object,
+            db: object,
+            kubectl_command: str = '/usr/bin/kubectl',
+            juju_command: str = '/usr/bin/juju',
+            log=None,
+            on_update_db=None,
     ):
         """
 
@@ -53,25 +57,30 @@ class K8sJujuConnector(K8sConnector):
         # parent class
         K8sConnector.__init__(
             self,
-            kubectl_command=kubectl_command,
-            fs=fs,
+            db,
             log=log,
+            on_update_db=on_update_db,
         )
 
+        self.fs = fs
         self.info('Initializing K8S Juju connector')
 
         self.authenticated = False
         self.models = {}
         self.log = logging.getLogger(__name__)
+
+        self.juju_command = juju_command
+        self.juju_secret = ""
+
         self.info('K8S Juju connector initialized')
 
     """Initialization"""
     async def init_env(
         self,
-        k8s_creds: dict,
+        k8s_creds: str,
         namespace: str = 'kube-system',
         reuse_cluster_uuid: str = None,
-    ) -> str:
+    ) -> (str, bool):
         """Initialize a Kubernetes environment
 
         :param k8s_creds dict: A dictionary containing the Kubernetes cluster
@@ -86,10 +95,6 @@ class K8sJujuConnector(K8sConnector):
         Bootstrapping cannot be done, by design, through the API. We need to
         use the CLI tools.
         """
-        # TODO: The path may change
-        jujudir = "/snap/bin"
-
-        self.k8scli = "{}/juju".format(jujudir)
 
         """
         WIP: Workflow
@@ -123,26 +128,22 @@ class K8sJujuConnector(K8sConnector):
             # Does the kubeconfig contain microk8s?
             microk8s = self.is_microk8s_by_credentials(k8s_creds)
 
-            if not microk8s:
-                # Name the new k8s cloud
-                k8s_cloud = "{}-k8s".format(namespace)
+            # Name the new k8s cloud
+            k8s_cloud = "{}-k8s".format(namespace)
 
-                await self.add_k8s(k8s_cloud, k8s_creds)
+            print("Adding k8s cloud {}".format(k8s_cloud))
+            await self.add_k8s(k8s_cloud, k8s_creds)
 
-                # Bootstrap Juju controller
-                self.bootstrap(k8s_cloud, cluster_uuid)
-            else:
-                # k8s_cloud = 'microk8s-test'
-                k8s_cloud = "{}-k8s".format(namespace)
-
-                await self.add_k8s(k8s_cloud, k8s_creds)
-
-                await self.bootstrap(k8s_cloud, cluster_uuid)
+            # Bootstrap Juju controller
+            print("Bootstrapping...")
+            await self.bootstrap(k8s_cloud, cluster_uuid, microk8s)
+            print("Bootstrap done.")
 
             # Get the controller information
 
             # Parse ~/.local/share/juju/controllers.yaml
             # controllers.testing.api-endpoints|ca-cert|uuid
+            print("Getting controller endpoints")
             with open(os.path.expanduser(
                 "~/.local/share/juju/controllers.yaml"
             )) as f:
@@ -154,6 +155,7 @@ class K8sJujuConnector(K8sConnector):
 
             # Parse ~/.local/share/juju/accounts
             # controllers.testing.user|password
+            print("Getting accounts")
             with open(os.path.expanduser(
                 "~/.local/share/juju/accounts.yaml"
             )) as f:
@@ -183,6 +185,7 @@ class K8sJujuConnector(K8sConnector):
 
             # Store the cluster configuration so it
             # can be used for subsequent calls
+            print("Setting config")
             await self.set_config(cluster_uuid, config)
 
         else:
@@ -199,17 +202,20 @@ class K8sJujuConnector(K8sConnector):
 
         # Login to the k8s cluster
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
         # We're creating a new cluster
-        print("Getting model {}".format(self.get_namespace(cluster_uuid)))
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
+        model = await self.get_model(
+            self.get_namespace(cluster_uuid), 
+            cluster_uuid=cluster_uuid
+        )
 
         # Disconnect from the model
         if model and model.is_connected():
             await model.disconnect()
 
-        return cluster_uuid
+        return cluster_uuid, True
 
     """Repo Management"""
     async def repo_add(
@@ -231,8 +237,10 @@ class K8sJujuConnector(K8sConnector):
 
     """Reset"""
     async def reset(
-        self,
-        cluster_uuid: str,
+            self,
+            cluster_uuid: str,
+            force: bool = False,
+            uninstall_sw: bool = False
     ) -> bool:
         """Reset a cluster
 
@@ -244,7 +252,7 @@ class K8sJujuConnector(K8sConnector):
 
         try:
             if not self.authenticated:
-                await self.login()
+                await self.login(cluster_uuid)
 
             if self.controller.is_connected():
                 # Destroy the model
@@ -280,14 +288,16 @@ class K8sJujuConnector(K8sConnector):
             print("Caught exception during reset: {}".format(ex))
 
     """Deployment"""
+
     async def install(
         self,
         cluster_uuid: str,
         kdu_model: str,
         atomic: bool = True,
-        timeout: int = None,
+        timeout: float = 300,
         params: dict = None,
-    ) -> str:
+        db_dict: dict = None
+    ) -> bool:
         """Install a bundle
 
         :param cluster_uuid str: The UUID of the cluster to install to
@@ -303,22 +313,51 @@ class K8sJujuConnector(K8sConnector):
 
         if not self.authenticated:
             print("[install] Logging in to the controller")
-            await self.login()
+            await self.login(cluster_uuid)
 
         ##
         # Get or create the model, based on the namespace the cluster was
         # instantiated with.
         namespace = self.get_namespace(cluster_uuid)
-        model = await self.get_model(namespace)
+
+        self.log.debug("Checking for model named {}".format(namespace))
+        model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
         if not model:
             # Create the new model
-            model = await self.add_model(namespace)
+            self.log.debug("Adding model: {}".format(namespace))
+            model = await self.add_model(namespace, cluster_uuid=cluster_uuid)
 
         if model:
             # TODO: Instantiation parameters
 
-            print("[install] deploying {}".format(kdu_model))
-            await model.deploy(kdu_model)
+            """
+            "Juju bundle that models the KDU, in any of the following ways:
+                - <juju-repo>/<juju-bundle>
+                - <juju-bundle folder under k8s_models folder in the package>
+                - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+                - <URL_where_to_fetch_juju_bundle>
+            """
+
+            bundle = kdu_model
+            if kdu_model.startswith("cs:"):
+                bundle = kdu_model
+            elif kdu_model.startswith("http"):
+                # Download the file
+                pass
+            else:
+                # Local file
+
+                # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
+                # Uncompress temporarily
+                # bundle = <uncompressed file>
+                pass
+
+            if not bundle:
+                # Raise named exception that the bundle could not be found
+                raise Exception()
+
+            print("[install] deploying {}".format(bundle))
+            await model.deploy(bundle)
 
             # Get the application
             if atomic:
@@ -400,10 +439,10 @@ class K8sJujuConnector(K8sConnector):
         initial release.
         """
         namespace = self.get_namespace(cluster_uuid)
-        model = await self.get_model(namespace)
+        model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
 
         with open(kdu_model, 'r') as f:
-            bundle = yaml.load(f, Loader=yaml.FullLoader)
+            bundle = yaml.safe_load(f)
 
             """
             {
@@ -478,7 +517,7 @@ class K8sJujuConnector(K8sConnector):
         removed = False
 
         # Remove an application from the model
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
 
         if model:
             # Get the application
@@ -514,7 +553,7 @@ class K8sJujuConnector(K8sConnector):
 
         kdu = {}
         with open(kdu_model, 'r') as f:
-            bundle = yaml.load(f, Loader=yaml.FullLoader)
+            bundle = yaml.safe_load(f)
 
             """
             {
@@ -580,7 +619,7 @@ class K8sJujuConnector(K8sConnector):
         """
         status = {}
 
-        model = await self.get_model(self.get_namespace(cluster_uuid))
+        model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
 
         # model = await self.get_model_by_uuid(cluster_uuid)
         if model:
@@ -602,7 +641,7 @@ class K8sJujuConnector(K8sConnector):
     async def add_k8s(
         self,
         cloud_name: str,
-        credentials: dict,
+        credentials: str,
     ) -> bool:
         """Add a k8s cloud to Juju
 
@@ -615,16 +654,19 @@ class K8sJujuConnector(K8sConnector):
 
         :returns: True if successful, otherwise raises an exception.
         """
-        cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
 
+        cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
+        print(cmd)
         p = subprocess.run(
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
-            input=yaml.dump(credentials, Dumper=yaml.Dumper),
-            encoding='ascii'
+            # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
+            input=credentials.encode("utf-8"),
+            # encoding='ascii'
         )
         retcode = p.returncode
+        print("add-k8s return code: {}".format(retcode))
 
         if retcode > 0:
             raise Exception(p.stderr)
@@ -632,7 +674,8 @@ class K8sJujuConnector(K8sConnector):
 
     async def add_model(
         self,
-        model_name: str
+        model_name: str,
+        cluster_uuid: str,
     ) -> juju.model.Model:
         """Adds a model to the controller
 
@@ -643,8 +686,9 @@ class K8sJujuConnector(K8sConnector):
                   raises an exception.
         """
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
+        self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
         model = await self.controller.add_model(
             model_name,
             config={'authorized-keys': self.juju_public_key}
@@ -654,7 +698,8 @@ class K8sJujuConnector(K8sConnector):
     async def bootstrap(
         self,
         cloud_name: str,
-        cluster_uuid: str
+        cluster_uuid: str,
+        microk8s: bool
     ) -> bool:
         """Bootstrap a Kubernetes controller
 
@@ -662,9 +707,18 @@ class K8sJujuConnector(K8sConnector):
 
         :param cloud_name str: The name of the cloud.
         :param cluster_uuid str: The UUID of the cluster to bootstrap.
+        :param microk8s bool: If this is a microk8s cluster.
         :returns: True upon success or raises an exception.
         """
-        cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
+
+        if microk8s:
+            cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
+        else:
+            """
+            For non-microk8s clusters, specify that the controller service is using a LoadBalancer.
+            """
+            cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
+
         print("Bootstrapping controller {} in cloud {}".format(
             cluster_uuid, cloud_name
         ))
@@ -673,13 +727,13 @@ class K8sJujuConnector(K8sConnector):
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
-            encoding='ascii'
+            encoding='ascii'
         )
         retcode = p.returncode
 
         if retcode > 0:
             #
-            if 'already exists' not in p.stderr:
+            if b'already exists' not in p.stderr:
                 raise Exception(p.stderr)
 
         return True
@@ -696,7 +750,7 @@ class K8sJujuConnector(K8sConnector):
         :returns: True upon success or raises an exception.
         """
         cmd = [
-            self.k8scli,
+            self.juju_command,
             "destroy-controller",
             "--destroy-all-models",
             "--destroy-storage",
@@ -708,7 +762,7 @@ class K8sJujuConnector(K8sConnector):
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
-            encoding='ascii'
+            encoding='ascii'
         )
         retcode = p.returncode
 
@@ -731,7 +785,7 @@ class K8sJujuConnector(K8sConnector):
         cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
         if os.path.exists(cluster_config):
             with open(cluster_config, 'r') as f:
-                config = yaml.load(f.read(), Loader=yaml.FullLoader)
+                config = yaml.safe_load(f.read())
                 return config
         else:
             raise Exception(
@@ -743,6 +797,7 @@ class K8sJujuConnector(K8sConnector):
     async def get_model(
         self,
         model_name: str,
+        cluster_uuid: str,
     ) -> juju.model.Model:
         """Get a model from the Juju Controller.
 
@@ -753,12 +808,13 @@ class K8sJujuConnector(K8sConnector):
         :return The juju.model.Model object if found, or None.
         """
         if not self.authenticated:
-            await self.login()
+            await self.login(cluster_uuid)
 
         model = None
         models = await self.controller.list_models()
-
+        self.log.debug(models)
         if model_name in models:
+            self.log.debug("Found model: {}".format(model_name))
             model = await self.controller.get_model(
                 model_name
             )
@@ -818,7 +874,7 @@ class K8sJujuConnector(K8sConnector):
 
     def is_microk8s_by_credentials(
         self,
-        credentials: dict,
+        credentials: str,
     ) -> bool:
         """Check if a cluster is micro8s
 
@@ -827,13 +883,15 @@ class K8sJujuConnector(K8sConnector):
         :param credentials dict: A dictionary containing the k8s credentials
         :returns: A boolean if the cluster is running microk8s
         """
-        for context in credentials['contexts']:
-            if 'microk8s' in context['name']:
-                return True
+        creds = yaml.safe_load(credentials)
+        if creds:
+            for context in creds['contexts']:
+                if 'microk8s' in context['name']:
+                    return True
 
         return False
 
-    async def login(self):
+    async def login(self, cluster_uuid):
         """Login to the Juju controller."""
 
         if self.authenticated:
@@ -841,6 +899,15 @@ class K8sJujuConnector(K8sConnector):
 
         self.connecting = True
 
+        # Test: Make sure we have the credentials loaded
+        config = self.get_config(cluster_uuid)
+
+        self.juju_endpoint = config['endpoint']
+        self.juju_user = config['username']
+        self.juju_secret = config['secret']
+        self.juju_ca_cert = config['cacert']
+        self.juju_public_key = None
+
         self.controller = Controller()
 
         if self.juju_secret:
@@ -901,12 +968,12 @@ class K8sJujuConnector(K8sConnector):
         """
 
         # Remove the bootstrapped controller
-        cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
+        cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
         p = subprocess.run(
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
-            encoding='ascii'
+            encoding='ascii'
         )
         retcode = p.returncode
 
@@ -914,12 +981,12 @@ class K8sJujuConnector(K8sConnector):
             raise Exception(p.stderr)
 
         # Remove the cloud from the local config
-        cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
+        cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
         p = subprocess.run(
             cmd,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
-            encoding='ascii'
+            encoding='ascii'
         )
         retcode = p.returncode
 
index 40efa24..87a645d 100644 (file)
@@ -126,7 +126,7 @@ class Loggable:
         # datetime
         dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
         dt = dt + time_str
-        dt = time_str       # logger already shows datetime
+        dt = time_str       # logger already shows datetime
 
         # current thread
         if include_thread:
index 97b6188..d3aaf35 100644 (file)
@@ -106,6 +106,8 @@ class N2VCConnector(abc.ABC, Loggable):
         self.on_update_db = on_update_db
 
         # generate private/public key-pair
+        self.private_key_path = None
+        self.public_key_path = None
         self.get_public_key()
 
     @abc.abstractmethod
@@ -116,7 +118,7 @@ class N2VCConnector(abc.ABC, Loggable):
         """
 
     # TODO: review which public key
-    async def get_public_key(self) -> str:
+    def get_public_key(self) -> str:
         """Get the VCA ssh-public-key
 
         Returns the SSH public key from local mahine, to be injected into virtual machines to
index 2d2fbdb..aba88ee 100644 (file)
@@ -39,6 +39,9 @@ from juju.model import Model
 from juju.application import Application
 from juju.action import Action
 from juju.machine import Machine
+from juju.client import client
+
+from n2vc.provisioner import SSHProvisioner
 
 
 class N2VCJujuConnector(N2VCConnector):
@@ -58,8 +61,7 @@ class N2VCJujuConnector(N2VCConnector):
         url: str = '127.0.0.1:17070',
         username: str = 'admin',
         vca_config: dict = None,
-        on_update_db=None,
-        api_proxy=None
+        on_update_db=None
     ):
         """Initialize juju N2VC connector
         """
@@ -148,8 +150,9 @@ class N2VCJujuConnector(N2VCConnector):
         if self.ca_cert:
             self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
 
-        if api_proxy:
-            self.api_proxy = api_proxy
+        if 'api_proxy' in vca_config:
+            self.api_proxy = vca_config['api_proxy']
+            self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
         else:
             self.warning('api_proxy is not configured. Support for native charms is disabled')
 
@@ -286,7 +289,7 @@ class N2VCJujuConnector(N2VCConnector):
 
         # register machine on juju
         try:
-            machine = await self._juju_provision_machine(
+            machine_id = await self._juju_provision_machine(
                 model_name=model_name,
                 hostname=hostname,
                 username=username,
@@ -298,13 +301,14 @@ class N2VCJujuConnector(N2VCConnector):
         except Exception as e:
             self.error('Error registering machine: {}'.format(e))
             raise N2VCException(message='Error registering machine on juju: {}'.format(e))
-        self.info('Machine registered')
+
+        self.info('Machine registered: {}'.format(machine_id))
 
         # id for the execution environment
         ee_id = N2VCJujuConnector._build_ee_id(
             model_name=model_name,
             application_name=application_name,
-            machine_id=str(machine.entity_id)
+            machine_id=str(machine_id)
         )
 
         self.info('Execution environment registered. ee_id: {}'.format(ee_id))
@@ -808,9 +812,14 @@ class N2VCJujuConnector(N2VCConnector):
             db_dict: dict = None,
             progress_timeout: float = None,
             total_timeout: float = None
-    ) -> Machine:
+    ) -> str:
+
+        if not self.api_proxy:
+            msg = 'Cannot provision machine: api_proxy is not defined'
+            self.error(msg=msg)
+            raise N2VCException(message=msg)
 
-        self.debug('provisioning machine. model: {}, hostname: {}'.format(model_name, hostname))
+        self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
 
         if not self._authenticated:
             await self._juju_login()
@@ -819,30 +828,79 @@ class N2VCJujuConnector(N2VCConnector):
         model = await self._juju_get_model(model_name=model_name)
         observer = self.juju_observers[model_name]
 
-        spec = 'ssh:{}@{}:{}'.format(username, hostname, private_key_path)
-        self.debug('provisioning machine {}'.format(spec))
+        # TODO check if machine is already provisioned
+        machine_list = await model.get_machines()
+
+        provisioner = SSHProvisioner(
+            host=hostname,
+            user=username,
+            private_key_path=private_key_path,
+            log=self.log
+        )
+
+        params = None
         try:
-            machine = await model.add_machine(spec=spec)
-        except Exception as e:
-            import sys
-            import traceback
-            traceback.print_exc(file=sys.stdout)
-            print('-' * 60)
-            raise e
+            params = provisioner.provision_machine()
+        except Exception as ex:
+            msg = "Exception provisioning machine: {}".format(ex)
+            self.log.error(msg)
+            raise N2VCException(message=msg)
+
+        params.jobs = ['JobHostUnits']
+
+        connection = model.connection()
+
+        # Submit the request.
+        self.debug("Adding machine to model")
+        client_facade = client.ClientFacade.from_connection(connection)
+        results = await client_facade.AddMachines(params=[params])
+        error = results.machines[0].error
+        if error:
+            msg = "Error adding machine: {}}".format(error.message)
+            self.error(msg=msg)
+            raise ValueError(msg)
+
+        machine_id = results.machines[0].machine
+
+        # Need to run this after AddMachines has been called,
+        # as we need the machine_id
+        self.debug("Installing Juju agent into machine {}".format(machine_id))
+        asyncio.ensure_future(provisioner.install_agent(
+            connection=connection,
+            nonce=params.nonce,
+            machine_id=machine_id,
+            api=self.api_proxy,
+        ))
+
+        # wait for machine in model (now, machine is not yet in model, so we must wait for it)
+        machine = None
+        for i in range(10):
+            machine_list = await model.get_machines()
+            if machine_id in machine_list:
+                self.debug('Machine {} found in model!'.format(machine_id))
+                machine = model.machines.get(machine_id)
+                break
+            await asyncio.sleep(2)
+
+        if machine is None:
+            msg = 'Machine {} not found in model'.format(machine_id)
+            self.error(msg=msg)
+            raise Exception(msg)
 
         # register machine with observer
         observer.register_machine(machine=machine, db_dict=db_dict)
 
         # wait for machine creation
-        self.debug('waiting for provision completed... {}'.format(machine.entity_id))
+        self.debug('waiting for provision finishes... {}'.format(machine_id))
         await observer.wait_for_machine(
-            machine=machine,
+            machine_id=machine_id,
             progress_timeout=progress_timeout,
             total_timeout=total_timeout
         )
 
-        self.debug("Machine provisioned {}".format(machine.entity_id))
-        return machine
+        self.debug("Machine provisioned {}".format(machine_id))
+
+        return machine_id
 
     async def _juju_deploy_charm(
             self,
@@ -870,12 +928,14 @@ class N2VCJujuConnector(N2VCConnector):
             self.debug('deploying application {} to machine {}, model {}'
                        .format(application_name, machine_id, model_name))
             self.debug('charm: {}'.format(charm_path))
+            series = 'xenial'
+            # series = None
             application = await model.deploy(
                 entity_url=charm_path,
                 application_name=application_name,
                 channel='stable',
                 num_units=1,
-                series='xenial',
+                series=series,
                 to=machine_id
             )
 
index 6e1e98e..95b5f38 100644 (file)
@@ -62,24 +62,27 @@ async def main():
     fs.fs_connect(storage)
 
     client = n2vc.k8s_juju_conn.K8sJujuConnector(
-        kubectl_command = '/bin/true',
-        fs = fs,
+        kubectl_command='/snap/bin/kubectl',
+        juju_command='/snap/bin/juju',
+        fs=fs,
+        db=None,
     )
 
     # kubectl config view --raw
     # microk8s.config
 
     # if microk8s then
-    kubecfg = subprocess.getoutput('microk8s.config')
+    kubecfg = subprocess.getoutput('microk8s.config')
     # else
-    # kubecfg.subprocess.getoutput('kubectl config view --raw')
-    
-    k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
+    kubecfg = subprocess.getoutput('kubectl config view --raw')
+    # print(kubecfg)
+
+    # k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
     namespace = 'testing'
     kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
 
     """init_env"""
-    cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
+    cluster_uuid, _ = await client.init_env(kubecfg, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
     print(cluster_uuid)
 
     if not reuse_cluster_uuid: