Feature 9751: Centralized VCA for KNFs
- Use libjuju.py for the communication with VCA
- Add the k8s_cloud as an external cloud to the VCA
- Add unit tests
Change-Id: Id55bfada3957f35e13cef7b4bfcc7acb72452df0
Signed-off-by: David Garcia <david.garcia@canonical.com>
diff --git a/n2vc/juju_watcher.py b/n2vc/juju_watcher.py
index 842e990..e122786 100644
--- a/n2vc/juju_watcher.py
+++ b/n2vc/juju_watcher.py
@@ -49,8 +49,42 @@
class JujuModelWatcher:
@staticmethod
+ async def wait_for_model(model: Model, timeout: float = 3600):
+ """
+ Wait for all entities in model to reach its final state.
+
+ :param: model: Model to observe
+ :param: timeout: Timeout for the model applications to be active
+
+ :raises: asyncio.TimeoutError when timeout reaches
+ """
+
+ if timeout is None:
+ timeout = 3600.0
+
+ # Coroutine to wait until the entity reaches the final state
+ wait_for_entity = asyncio.ensure_future(
+ asyncio.wait_for(
+ model.block_until(
+ lambda: all(
+ entity_ready(entity) for entity in model.applications.values()
+ )
+ ),
+ timeout=timeout,
+ )
+ )
+
+ tasks = [wait_for_entity]
+ try:
+ await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+ finally:
+ # Cancel tasks
+ for task in tasks:
+ task.cancel()
+
+ @staticmethod
async def wait_for(
- model,
+ model: Model,
entity: ModelEntity,
progress_timeout: float = 3600,
total_timeout: float = 3600,
@@ -103,8 +137,6 @@
# Execute tasks, and stop when the first is finished
# The watcher task won't never finish (unless it timeouts)
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
- except Exception as e:
- raise e
finally:
# Cancel tasks
for task in tasks:
diff --git a/n2vc/k8s_juju_conn.py b/n2vc/k8s_juju_conn.py
index 70045b0..7e58deb 100644
--- a/n2vc/k8s_juju_conn.py
+++ b/n2vc/k8s_juju_conn.py
@@ -13,17 +13,17 @@
# limitations under the License.
import asyncio
-import concurrent
import os
import uuid
import yaml
+import tempfile
-from juju.controller import Controller
-from juju.model import Model
-from n2vc.exceptions import K8sException
+from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl
-from .exceptions import MethodNotImplemented, N2VCNotFound
+from .exceptions import MethodNotImplemented
+from n2vc.utils import base64_to_cacert
+from n2vc.libjuju import Libjuju
# from juju.bundle import BundleHandler
@@ -43,24 +43,57 @@
vca_config: dict = None,
):
"""
-
+ :param fs: file system for kubernetes and helm configuration
+ :param db: Database object
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
- :param fs: file system for kubernetes and helm configuration
:param log: logger
+ :param: loop: Asyncio loop
"""
# parent class
K8sConnector.__init__(
- self, db, log=log, on_update_db=on_update_db,
+ self,
+ db,
+ log=log,
+ on_update_db=on_update_db,
)
self.fs = fs
+ self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
- self.juju_command = juju_command
- self.juju_public_key = None
+ required_vca_config = [
+ "host",
+ "user",
+ "secret",
+ "ca_cert",
+ ]
+ if not vca_config or not all(k in vca_config for k in required_vca_config):
+ raise N2VCBadArgumentsException(
+ message="Missing arguments in vca_config: {}".format(vca_config),
+ bad_args=required_vca_config,
+ )
+ port = vca_config["port"] if "port" in vca_config else 17070
+ url = "{}:{}".format(vca_config["host"], port)
+ enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
+ apt_mirror = vca_config.get("apt_mirror", None)
+ username = vca_config["user"]
+ secret = vca_config["secret"]
+ ca_cert = base64_to_cacert(vca_config["ca_cert"])
+ self.libjuju = Libjuju(
+ endpoint=url,
+ api_proxy=None, # Not needed for k8s charms
+ enable_os_upgrade=enable_os_upgrade,
+ apt_mirror=apt_mirror,
+ username=username,
+ password=secret,
+ cacert=ca_cert,
+ loop=self.loop,
+ log=self.log,
+ db=self.db,
+ )
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
# self.authenticated = False
@@ -88,26 +121,26 @@
(on error, an exception will be raised)
"""
- """Bootstrapping
+ # """Bootstrapping
- Bootstrapping cannot be done, by design, through the API. We need to
- use the CLI tools.
- """
+ # Bootstrapping cannot be done, by design, through the API. We need to
+ # use the CLI tools.
+ # """
- """
- WIP: Workflow
+ # """
+ # WIP: Workflow
- 1. Has the environment already been bootstrapped?
- - Check the database to see if we have a record for this env
+ # 1. Has the environment already been bootstrapped?
+ # - Check the database to see if we have a record for this env
- 2. If this is a new env, create it
- - Add the k8s cloud to Juju
- - Bootstrap
- - Record it in the database
+ # 2. If this is a new env, create it
+ # - Add the k8s cloud to Juju
+ # - Bootstrap
+ # - Record it in the database
- 3. Connect to the Juju controller for this cloud
+ # 3. Connect to the Juju controller for this cloud
- """
+ # """
# cluster_uuid = reuse_cluster_uuid
# if not cluster_uuid:
# cluster_uuid = str(uuid4())
@@ -131,60 +164,73 @@
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
# Is a local k8s cluster?
- localk8s = self.is_local_k8s(k8s_creds)
+ # localk8s = self.is_local_k8s(k8s_creds)
# If the k8s is external, the juju controller needs a loadbalancer
- loadbalancer = False if localk8s else True
+ # loadbalancer = False if localk8s else True
# Name the new k8s cloud
- k8s_cloud = "k8s-{}".format(cluster_uuid)
+ # k8s_cloud = "k8s-{}".format(cluster_uuid)
- self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
- await self.add_k8s(k8s_cloud, k8s_creds)
+ # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
+ # await self.add_k8s(k8s_cloud, k8s_creds)
# Bootstrap Juju controller
- self.log.debug("Bootstrapping...")
- await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
- self.log.debug("Bootstrap done.")
+ # self.log.debug("Bootstrapping...")
+ # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
+ # self.log.debug("Bootstrap done.")
# Get the controller information
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
- self.log.debug("Getting controller endpoints")
- with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
- endpoints = controller["api-endpoints"]
- juju_endpoint = endpoints[0]
- juju_ca_cert = controller["ca-cert"]
+ # self.log.debug("Getting controller endpoints")
+ # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
+ # controllers = yaml.load(f, Loader=yaml.Loader)
+ # controller = controllers["controllers"][cluster_uuid]
+ # endpoints = controller["api-endpoints"]
+ # juju_endpoint = endpoints[0]
+ # juju_ca_cert = controller["ca-cert"]
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
- self.log.debug("Getting accounts")
- with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
+ # self.log.debug("Getting accounts")
+ # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
+ # controllers = yaml.load(f, Loader=yaml.Loader)
+ # controller = controllers["controllers"][cluster_uuid]
- juju_user = controller["user"]
- juju_secret = controller["password"]
+ # juju_user = controller["user"]
+ # juju_secret = controller["password"]
- config = {
- "endpoint": juju_endpoint,
- "username": juju_user,
- "secret": juju_secret,
- "cacert": juju_ca_cert,
- "loadbalancer": loadbalancer,
- }
+ # config = {
+ # "endpoint": juju_endpoint,
+ # "username": juju_user,
+ # "secret": juju_secret,
+ # "cacert": juju_ca_cert,
+ # "loadbalancer": loadbalancer,
+ # }
# Store the cluster configuration so it
# can be used for subsequent calls
- self.log.debug("Setting config")
- await self.set_config(cluster_uuid, config)
+
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(k8s_creds)
+ kubectl = Kubectl(config_file=kubecfg.name)
+ configuration = kubectl.get_configuration()
+ default_storage_class = kubectl.get_default_storage_class()
+ await self.libjuju.add_k8s(
+ name=cluster_uuid,
+ configuration=configuration,
+ storage_class=default_storage_class,
+ credential_name=self._get_credential_name(cluster_uuid),
+ )
+ # self.log.debug("Setting config")
+ # await self.set_config(cluster_uuid, config)
# Test connection
- controller = await self.get_controller(cluster_uuid)
- await controller.disconnect()
+ # controller = await self.get_controller(cluster_uuid)
+ # await controller.disconnect()
# TODO: Remove these commented lines
# raise Exception("EOL")
@@ -210,7 +256,10 @@
"""Repo Management"""
async def repo_add(
- self, name: str, url: str, _type: str = "charm",
+ self,
+ name: str,
+ url: str,
+ _type: str = "charm",
):
raise MethodNotImplemented()
@@ -218,7 +267,8 @@
raise MethodNotImplemented()
async def repo_remove(
- self, name: str,
+ self,
+ name: str,
):
raise MethodNotImplemented()
@@ -244,29 +294,31 @@
try:
# Remove k8scluster from database
- self.log.debug("[reset] Removing k8scluster from juju database")
- juju_db = self.db.get_one("admin", {"_id": "juju"})
+ # self.log.debug("[reset] Removing k8scluster from juju database")
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
- for k in juju_db["k8sclusters"]:
- if k["_id"] == cluster_uuid:
- juju_db["k8sclusters"].remove(k)
- self.db.set_one(
- table="admin",
- q_filter={"_id": "juju"},
- update_dict={"k8sclusters": juju_db["k8sclusters"]},
- )
- break
+ # for k in juju_db["k8sclusters"]:
+ # if k["_id"] == cluster_uuid:
+ # juju_db["k8sclusters"].remove(k)
+ # self.db.set_one(
+ # table="admin",
+ # q_filter={"_id": "juju"},
+ # update_dict={"k8sclusters": juju_db["k8sclusters"]},
+ # )
+ # break
# Destroy the controller (via CLI)
- self.log.debug("[reset] Destroying controller")
- await self.destroy_controller(cluster_uuid)
+ # self.log.debug("[reset] Destroying controller")
+ # await self.destroy_controller(cluster_uuid)
self.log.debug("[reset] Removing k8s cloud")
- k8s_cloud = "k8s-{}".format(cluster_uuid)
- await self.remove_cloud(k8s_cloud)
+ # k8s_cloud = "k8s-{}".format(cluster_uuid)
+ # await self.remove_cloud(k8s_cloud)
+ await self.libjuju.remove_cloud(cluster_uuid)
- except Exception as ex:
- self.log.debug("Caught exception during reset: {}".format(ex))
+ except Exception as e:
+ self.log.debug("Caught exception during reset: {}".format(e))
+ raise e
return True
# TODO: Remove these commented lines
# if not self.authenticated:
@@ -290,7 +342,7 @@
cluster_uuid: str,
kdu_model: str,
atomic: bool = True,
- timeout: float = 300,
+ timeout: float = 1800,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
@@ -310,12 +362,29 @@
:return: If successful, returns ?
"""
+ bundle = kdu_model
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
##
# Get or create the model, based on the NS
# uuid.
+
+ if not db_dict:
+ raise K8sException("db_dict must be set")
+ if not bundle:
+ raise K8sException("bundle must be set")
+
+ if bundle.startswith("cs:"):
+ pass
+ elif bundle.startswith("http"):
+ # Download the file
+ pass
+ else:
+ new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
+ os.chdir(new_workdir)
+ bundle = "local:{}".format(kdu_model)
+
if kdu_name:
kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
else:
@@ -325,84 +394,70 @@
# Create the new model
self.log.debug("Adding model: {}".format(kdu_instance))
- model = await self.add_model(
- kdu_instance, cluster_uuid=cluster_uuid, controller=controller
+ await self.libjuju.add_model(
+ model_name=kdu_instance,
+ cloud_name=cluster_uuid,
+ credential_name=self._get_credential_name(cluster_uuid),
)
- if model:
- # TODO: Instantiation parameters
+ # if model:
+ # TODO: Instantiation parameters
- """
- "Juju bundle that models the KDU, in any of the following ways:
- - <juju-repo>/<juju-bundle>
- - <juju-bundle folder under k8s_models folder in the package>
- - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
- in the package>
- - <URL_where_to_fetch_juju_bundle>
- """
- try:
- previous_workdir = os.getcwd()
- except FileNotFoundError:
- previous_workdir = "/app/storage"
+ """
+ "Juju bundle that models the KDU, in any of the following ways:
+ - <juju-repo>/<juju-bundle>
+ - <juju-bundle folder under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+ in the package>
+ - <URL_where_to_fetch_juju_bundle>
+ """
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
- bundle = kdu_model
- if kdu_model.startswith("cs:"):
- bundle = kdu_model
- elif kdu_model.startswith("http"):
- # Download the file
- pass
- else:
- new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
+ self.log.debug("[install] deploying {}".format(bundle))
+ await self.libjuju.deploy(
+ bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
+ )
- os.chdir(new_workdir)
+ # Get the application
+ # if atomic:
+ # # applications = model.applications
+ # self.log.debug("[install] Applications: {}".format(model.applications))
+ # for name in model.applications:
+ # self.log.debug("[install] Waiting for {} to settle".format(name))
+ # application = model.applications[name]
+ # try:
+ # # It's not enough to wait for all units to be active;
+ # # the application status needs to be active as well.
+ # self.log.debug("Waiting for all units to be active...")
+ # await model.block_until(
+ # lambda: all(
+ # unit.agent_status == "idle"
+ # and application.status in ["active", "unknown"]
+ # and unit.workload_status in ["active", "unknown"]
+ # for unit in application.units
+ # ),
+ # timeout=timeout,
+ # )
+ # self.log.debug("All units active.")
- bundle = "local:{}".format(kdu_model)
+ # # TODO use asyncio.TimeoutError
+ # except concurrent.futures._base.TimeoutError:
+ # os.chdir(previous_workdir)
+ # self.log.debug("[install] Timeout exceeded; resetting cluster")
+ # await self.reset(cluster_uuid)
+ # return False
- if not bundle:
- # Raise named exception that the bundle could not be found
- raise Exception()
+ # Wait for the application to be active
+ # if model.is_connected():
+ # self.log.debug("[install] Disconnecting model")
+ # await model.disconnect()
+ # await controller.disconnect()
+ os.chdir(previous_workdir)
- self.log.debug("[install] deploying {}".format(bundle))
- await model.deploy(bundle)
-
- # Get the application
- if atomic:
- # applications = model.applications
- self.log.debug("[install] Applications: {}".format(model.applications))
- for name in model.applications:
- self.log.debug("[install] Waiting for {} to settle".format(name))
- application = model.applications[name]
- try:
- # It's not enough to wait for all units to be active;
- # the application status needs to be active as well.
- self.log.debug("Waiting for all units to be active...")
- await model.block_until(
- lambda: all(
- unit.agent_status == "idle"
- and application.status in ["active", "unknown"]
- and unit.workload_status in ["active", "unknown"]
- for unit in application.units
- ),
- timeout=timeout,
- )
- self.log.debug("All units active.")
-
- # TODO use asyncio.TimeoutError
- except concurrent.futures._base.TimeoutError:
- os.chdir(previous_workdir)
- self.log.debug("[install] Timeout exceeded; resetting cluster")
- await self.reset(cluster_uuid)
- return False
-
- # Wait for the application to be active
- if model.is_connected():
- self.log.debug("[install] Disconnecting model")
- await model.disconnect()
- await controller.disconnect()
- os.chdir(previous_workdir)
-
- return kdu_instance
- raise Exception("Unable to install")
+ return kdu_instance
async def instances_list(self, cluster_uuid: str) -> list:
"""
@@ -502,7 +557,10 @@
"""Rollback"""
async def rollback(
- self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision: int = 0,
) -> str:
"""Rollback a model
@@ -527,14 +585,14 @@
:return: Returns True if successful, or raises an exception
"""
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
self.log.debug("[uninstall] Destroying model")
- await controller.destroy_models(kdu_instance)
+ await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
- self.log.debug("[uninstall] Model destroyed and disconnecting")
- await controller.disconnect()
+ # self.log.debug("[uninstall] Model destroyed and disconnecting")
+ # await controller.disconnect()
return True
# TODO: Remove these commented lines
@@ -563,7 +621,7 @@
:return: Returns the output of the action
"""
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
if not params or "application-name" not in params:
raise K8sException(
@@ -575,34 +633,40 @@
"[exec_primitive] Getting model "
"kdu_instance: {}".format(kdu_instance)
)
-
- model = await self.get_model(kdu_instance, controller=controller)
-
application_name = params["application-name"]
- application = model.applications[application_name]
-
- actions = await application.get_actions()
+ actions = await self.libjuju.get_actions(application_name, kdu_instance)
if primitive_name not in actions:
raise K8sException("Primitive {} not found".format(primitive_name))
-
- unit = None
- for u in application.units:
- if await u.is_leader_from_status():
- unit = u
- break
-
- if unit is None:
- raise K8sException("No leader unit found to execute action")
-
- self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
- action = await unit.run_action(primitive_name, **params)
-
- output = await model.get_action_output(action_uuid=action.entity_id)
- status = await model.get_action_status(uuid_or_prefix=action.entity_id)
-
- status = (
- status[action.entity_id] if action.entity_id in status else "failed"
+ output, status = await self.libjuju.execute_action(
+ application_name, kdu_instance, primitive_name, **params
)
+ # model = await self.get_model(kdu_instance, controller=controller)
+
+ # application_name = params["application-name"]
+ # application = model.applications[application_name]
+
+ # actions = await application.get_actions()
+ # if primitive_name not in actions:
+ # raise K8sException("Primitive {} not found".format(primitive_name))
+
+ # unit = None
+ # for u in application.units:
+ # if await u.is_leader_from_status():
+ # unit = u
+ # break
+
+ # if unit is None:
+ # raise K8sException("No leader unit found to execute action")
+
+ # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+ # action = await unit.run_action(primitive_name, **params)
+
+ # output = await model.get_action_output(action_uuid=action.entity_id)
+ # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+ # status = (
+ # status[action.entity_id] if action.entity_id in status else "failed"
+ # )
if status != "completed":
raise K8sException(
@@ -615,8 +679,8 @@
error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
self.log.error(error_msg)
raise K8sException(message=error_msg)
- finally:
- await controller.disconnect()
+ # finally:
+ # await controller.disconnect()
# TODO: Remove these commented lines:
# if not self.authenticated:
# self.log.debug("[exec_primitive] Connecting to controller")
@@ -624,7 +688,10 @@
"""Introspection"""
- async def inspect_kdu(self, kdu_model: str,) -> dict:
+ async def inspect_kdu(
+ self,
+ kdu_model: str,
+ ) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
@@ -637,8 +704,11 @@
"""
kdu = {}
+ if not os.path.exists(kdu_model):
+ raise K8sException("file {} not found".format(kdu_model))
+
with open(kdu_model, "r") as f:
- bundle = yaml.safe_load(f)
+ bundle = yaml.safe_load(f.read())
"""
{
@@ -663,7 +733,10 @@
return kdu
- async def help_kdu(self, kdu_model: str,) -> str:
+ async def help_kdu(
+ self,
+ kdu_model: str,
+ ) -> str:
"""View the README
If available, returns the README of the bundle.
@@ -684,7 +757,11 @@
return readme
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ ) -> dict:
"""Get the status of the KDU
Get the current status of the KDU instance.
@@ -696,18 +773,18 @@
and deployment_time.
"""
status = {}
- controller = await self.get_controller(cluster_uuid)
- model = await self.get_model(kdu_instance, controller=controller)
+ # controller = await self.get_controller(cluster_uuid)
+ # model = await self.get_model(kdu_instance, controller=controller)
- model_status = await model.get_status()
- status = model_status.applications
-
+ # model_status = await model.get_status()
+ # status = model_status.applications
+ model_status = await self.libjuju.get_model_status(kdu_instance)
for name in model_status.applications:
application = model_status.applications[name]
status[name] = {"status": application["status"]["status"]}
- await model.disconnect()
- await controller.disconnect()
+ # await model.disconnect()
+ # await controller.disconnect()
return status
@@ -718,15 +795,19 @@
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- config_path = "/tmp/{}".format(cluster_uuid)
- config_file = "{}/config".format(config_path)
+ # config_path = "/tmp/{}".format(cluster_uuid)
+ # config_file = "{}/config".format(config_path)
- if not os.path.exists(config_path):
- os.makedirs(config_path)
- with open(config_file, "w") as f:
- f.write(credentials)
+ # if not os.path.exists(config_path):
+ # os.makedirs(config_path)
+ # with open(config_file, "w") as f:
+ # f.write(credentials)
- kubectl = Kubectl(config_file=config_file)
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ kubectl = Kubectl(config_file=kubecfg.name)
+
return kubectl.get_services(
field_selector="metadata.namespace={}".format(kdu_instance)
)
@@ -738,15 +819,18 @@
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- config_path = "/tmp/{}".format(cluster_uuid)
- config_file = "{}/config".format(config_path)
+ # config_path = "/tmp/{}".format(cluster_uuid)
+ # config_file = "{}/config".format(config_path)
- if not os.path.exists(config_path):
- os.makedirs(config_path)
- with open(config_file, "w") as f:
- f.write(credentials)
+ # if not os.path.exists(config_path):
+ # os.makedirs(config_path)
+ # with open(config_file, "w") as f:
+ # f.write(credentials)
- kubectl = Kubectl(config_file=config_file)
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ kubectl = Kubectl(config_file=kubecfg.name)
return kubectl.get_services(
field_selector="metadata.name={},metadata.namespace={}".format(
@@ -755,154 +839,154 @@
)[0]
# Private methods
- async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
- """Add a k8s cloud to Juju
+ # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
+ # """Add a k8s cloud to Juju
- Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
- Juju Controller.
+ # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
+ # Juju Controller.
- :param cloud_name str: The name of the cloud to add.
- :param credentials dict: A dictionary representing the output of
- `kubectl config view --raw`.
+ # :param cloud_name str: The name of the cloud to add.
+ # :param credentials dict: A dictionary representing the output of
+ # `kubectl config view --raw`.
- :returns: True if successful, otherwise raises an exception.
- """
+ # :returns: True if successful, otherwise raises an exception.
+ # """
- cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
- self.log.debug(cmd)
+ # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
+ # self.log.debug(cmd)
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- )
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd,
+ # stdout=asyncio.subprocess.PIPE,
+ # stderr=asyncio.subprocess.PIPE,
+ # stdin=asyncio.subprocess.PIPE,
+ # )
- # Feed the process the credentials
- process.stdin.write(credentials.encode("utf-8"))
- await process.stdin.drain()
- process.stdin.close()
+ # # Feed the process the credentials
+ # process.stdin.write(credentials.encode("utf-8"))
+ # await process.stdin.drain()
+ # process.stdin.close()
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- self.log.debug("add-k8s return code: {}".format(return_code))
+ # self.log.debug("add-k8s return code: {}".format(return_code))
- if return_code > 0:
- raise Exception(stderr)
+ # if return_code > 0:
+ # raise Exception(stderr)
- return True
+ # return True
- async def add_model(
- self, model_name: str, cluster_uuid: str, controller: Controller
- ) -> Model:
- """Adds a model to the controller
+ # async def add_model(
+ # self, model_name: str, cluster_uuid: str, controller: Controller
+ # ) -> Model:
+ # """Adds a model to the controller
- Adds a new model to the Juju controller
+ # Adds a new model to the Juju controller
- :param model_name str: The name of the model to add.
- :param cluster_uuid str: ID of the cluster.
- :param controller: Controller object in which the model will be added
- :returns: The juju.model.Model object of the new model upon success or
- raises an exception.
- """
+ # :param model_name str: The name of the model to add.
+ # :param cluster_uuid str: ID of the cluster.
+ # :param controller: Controller object in which the model will be added
+ # :returns: The juju.model.Model object of the new model upon success or
+ # raises an exception.
+ # """
- self.log.debug(
- "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
- )
- model = None
- try:
- if self.juju_public_key is not None:
- model = await controller.add_model(
- model_name, config={"authorized-keys": self.juju_public_key}
- )
- else:
- model = await controller.add_model(model_name)
- except Exception as ex:
- self.log.debug(ex)
- self.log.debug("Caught exception: {}".format(ex))
- pass
+ # self.log.debug(
+ # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
+ # )
+ # model = None
+ # try:
+ # if self.juju_public_key is not None:
+ # model = await controller.add_model(
+ # model_name, config={"authorized-keys": self.juju_public_key}
+ # )
+ # else:
+ # model = await controller.add_model(model_name)
+ # except Exception as ex:
+ # self.log.debug(ex)
+ # self.log.debug("Caught exception: {}".format(ex))
+ # pass
- return model
+ # return model
- async def bootstrap(
- self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
- ) -> bool:
- """Bootstrap a Kubernetes controller
+ # async def bootstrap(
+ # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
+ # ) -> bool:
+ # """Bootstrap a Kubernetes controller
- Bootstrap a Juju controller inside the Kubernetes cluster
+ # Bootstrap a Juju controller inside the Kubernetes cluster
- :param cloud_name str: The name of the cloud.
- :param cluster_uuid str: The UUID of the cluster to bootstrap.
- :param loadbalancer bool: If the controller should use loadbalancer or not.
- :returns: True upon success or raises an exception.
- """
+ # :param cloud_name str: The name of the cloud.
+ # :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ # :param loadbalancer bool: If the controller should use loadbalancer or not.
+ # :returns: True upon success or raises an exception.
+ # """
- if not loadbalancer:
- cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
- else:
- """
- For public clusters, specify that the controller service is using a
- LoadBalancer.
- """
- cmd = [
- self.juju_command,
- "bootstrap",
- cloud_name,
- cluster_uuid,
- "--config",
- "controller-service-type=loadbalancer",
- ]
+ # if not loadbalancer:
+ # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
+ # else:
+ # """
+ # For public clusters, specify that the controller service is using a
+ # LoadBalancer.
+ # """
+ # cmd = [
+ # self.juju_command,
+ # "bootstrap",
+ # cloud_name,
+ # cluster_uuid,
+ # "--config",
+ # "controller-service-type=loadbalancer",
+ # ]
- self.log.debug(
- "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
- )
+ # self.log.debug(
+ # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+ # )
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- #
- if b"already exists" not in stderr:
- raise Exception(stderr)
+ # if return_code > 0:
+ # #
+ # if b"already exists" not in stderr:
+ # raise Exception(stderr)
- return True
+ # return True
- async def destroy_controller(self, cluster_uuid: str) -> bool:
- """Destroy a Kubernetes controller
+ # async def destroy_controller(self, cluster_uuid: str) -> bool:
+ # """Destroy a Kubernetes controller
- Destroy an existing Kubernetes controller.
+ # Destroy an existing Kubernetes controller.
- :param cluster_uuid str: The UUID of the cluster to bootstrap.
- :returns: True upon success or raises an exception.
- """
- cmd = [
- self.juju_command,
- "destroy-controller",
- "--destroy-all-models",
- "--destroy-storage",
- "-y",
- cluster_uuid,
- ]
+ # :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ # :returns: True upon success or raises an exception.
+ # """
+ # cmd = [
+ # self.juju_command,
+ # "destroy-controller",
+ # "--destroy-all-models",
+ # "--destroy-storage",
+ # "-y",
+ # cluster_uuid,
+ # ]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- #
- if "already exists" not in stderr:
- raise Exception(stderr)
+ # if return_code > 0:
+ # #
+ # if "already exists" not in stderr:
+ # raise Exception(stderr)
def get_credentials(self, cluster_uuid: str) -> str:
"""
@@ -922,68 +1006,86 @@
return yaml.safe_dump(k8scluster.get("credentials"))
- def get_config(self, cluster_uuid: str,) -> dict:
- """Get the cluster configuration
-
- Gets the configuration of the cluster
-
- :param cluster_uuid str: The UUID of the cluster.
- :return: A dict upon success, or raises an exception.
+ def _get_credential_name(self, cluster_uuid: str) -> str:
"""
+ Get credential name for a k8s cloud
- juju_db = self.db.get_one("admin", {"_id": "juju"})
- config = None
- for k in juju_db["k8sclusters"]:
- if k["_id"] == cluster_uuid:
- config = k["config"]
- self.db.encrypt_decrypt_fields(
- config,
- "decrypt",
- ["secret", "cacert"],
- schema_version="1.1",
- salt=k["_id"],
- )
- break
- if not config:
- raise Exception(
- "Unable to locate configuration for cluster {}".format(cluster_uuid)
- )
- return config
+ We cannot use the cluster_uuid for the credential name directly,
+ because it cannot start with a number, it must start with a letter.
+ Therefore, the k8s cloud credential name will be "cred-" followed
+ by the cluster uuid.
- async def get_model(self, model_name: str, controller: Controller) -> Model:
- """Get a model from the Juju Controller.
+ :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
- Note: Model objects returned must call disconnected() before it goes
- out of scope.
-
- :param model_name str: The name of the model to get
- :param controller Controller: Controller object
- :return The juju.model.Model object if found, or None.
+ :return: Name to use for the credential name.
"""
+ return "cred-{}".format(cluster_uuid)
- models = await controller.list_models()
- if model_name not in models:
- raise N2VCNotFound("Model {} not found".format(model_name))
- self.log.debug("Found model: {}".format(model_name))
- return await controller.get_model(model_name)
+ # def get_config(self, cluster_uuid: str,) -> dict:
+ # """Get the cluster configuration
- def get_namespace(self, cluster_uuid: str,) -> str:
+ # Gets the configuration of the cluster
+
+ # :param cluster_uuid str: The UUID of the cluster.
+ # :return: A dict upon success, or raises an exception.
+ # """
+
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
+ # config = None
+ # for k in juju_db["k8sclusters"]:
+ # if k["_id"] == cluster_uuid:
+ # config = k["config"]
+ # self.db.encrypt_decrypt_fields(
+ # config,
+ # "decrypt",
+ # ["secret", "cacert"],
+ # schema_version="1.1",
+ # salt=k["_id"],
+ # )
+ # break
+ # if not config:
+ # raise Exception(
+ # "Unable to locate configuration for cluster {}".format(cluster_uuid)
+ # )
+ # return config
+
+ # async def get_model(self, model_name: str, controller: Controller) -> Model:
+ # """Get a model from the Juju Controller.
+
+ # Note: Model objects returned must call disconnected() before it goes
+ # out of scope.
+
+ # :param model_name str: The name of the model to get
+ # :param controller Controller: Controller object
+ # :return The juju.model.Model object if found, or None.
+ # """
+
+ # models = await controller.list_models()
+ # if model_name not in models:
+ # raise N2VCNotFound("Model {} not found".format(model_name))
+ # self.log.debug("Found model: {}".format(model_name))
+ # return await controller.get_model(model_name)
+
+ def get_namespace(
+ self,
+ cluster_uuid: str,
+ ) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
:param cluster_uuid str: The UUID of the cluster
:returns: The namespace UUID, or raises an exception
"""
- config = self.get_config(cluster_uuid)
+ # config = self.get_config(cluster_uuid)
# Make sure the name is in the config
- if "namespace" not in config:
- raise Exception("Namespace not found.")
+ # if "namespace" not in config:
+ # raise Exception("Namespace not found.")
# TODO: We want to make sure this is unique to the cluster, in case
# the cluster is being reused.
# Consider pre/appending the cluster id to the namespace string
- return config["namespace"]
+ pass
# TODO: Remove these lines of code
# async def has_model(self, model_name: str) -> bool:
@@ -1000,57 +1102,57 @@
# return True
# return False
- def is_local_k8s(self, credentials: str,) -> bool:
- """Check if a cluster is local
+ # def is_local_k8s(self, credentials: str,) -> bool:
+ # """Check if a cluster is local
- Checks if a cluster is running in the local host
+ # Checks if a cluster is running in the local host
- :param credentials dict: A dictionary containing the k8s credentials
- :returns: A boolean if the cluster is running locally
- """
+ # :param credentials dict: A dictionary containing the k8s credentials
+ # :returns: A boolean if the cluster is running locally
+ # """
- creds = yaml.safe_load(credentials)
+ # creds = yaml.safe_load(credentials)
- if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
- for cluster in creds["clusters"]:
- if "server" in cluster["cluster"]:
- if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
- return True
+ # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
+ # for cluster in creds["clusters"]:
+ # if "server" in cluster["cluster"]:
+ # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
+ # return True
- return False
+ # return False
- async def get_controller(self, cluster_uuid):
- """Login to the Juju controller."""
+ # async def get_controller(self, cluster_uuid):
+ # """Login to the Juju controller."""
- config = self.get_config(cluster_uuid)
+ # config = self.get_config(cluster_uuid)
- juju_endpoint = config["endpoint"]
- juju_user = config["username"]
- juju_secret = config["secret"]
- juju_ca_cert = config["cacert"]
+ # juju_endpoint = config["endpoint"]
+ # juju_user = config["username"]
+ # juju_secret = config["secret"]
+ # juju_ca_cert = config["cacert"]
- controller = Controller()
+ # controller = Controller()
- if juju_secret:
- self.log.debug(
- "Connecting to controller... ws://{} as {}".format(
- juju_endpoint, juju_user,
- )
- )
- try:
- await controller.connect(
- endpoint=juju_endpoint,
- username=juju_user,
- password=juju_secret,
- cacert=juju_ca_cert,
- )
- self.log.debug("JujuApi: Logged into controller")
- return controller
- except Exception as ex:
- self.log.debug(ex)
- self.log.debug("Caught exception: {}".format(ex))
- else:
- self.log.fatal("VCA credentials not configured.")
+ # if juju_secret:
+ # self.log.debug(
+ # "Connecting to controller... ws://{} as {}".format(
+ # juju_endpoint, juju_user,
+ # )
+ # )
+ # try:
+ # await controller.connect(
+ # endpoint=juju_endpoint,
+ # username=juju_user,
+ # password=juju_secret,
+ # cacert=juju_ca_cert,
+ # )
+ # self.log.debug("JujuApi: Logged into controller")
+ # return controller
+ # except Exception as ex:
+ # self.log.debug(ex)
+ # self.log.debug("Caught exception: {}".format(ex))
+ # else:
+ # self.log.fatal("VCA credentials not configured.")
# TODO: Remove these commented lines
# self.authenticated = False
@@ -1078,66 +1180,66 @@
# self.authenticated = False
- async def remove_cloud(self, cloud_name: str,) -> bool:
- """Remove a k8s cloud from Juju
+ # async def remove_cloud(self, cloud_name: str,) -> bool:
+ # """Remove a k8s cloud from Juju
- Removes a Kubernetes cloud from Juju.
+ # Removes a Kubernetes cloud from Juju.
- :param cloud_name str: The name of the cloud to add.
+ # :param cloud_name str: The name of the cloud to add.
- :returns: True if successful, otherwise raises an exception.
- """
+ # :returns: True if successful, otherwise raises an exception.
+ # """
- # Remove the bootstrapped controller
- cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # # Remove the bootstrapped controller
+ # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- raise Exception(stderr)
+ # if return_code > 0:
+ # raise Exception(stderr)
- # Remove the cloud from the local config
- cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # # Remove the cloud from the local config
+ # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- raise Exception(stderr)
+ # if return_code > 0:
+ # raise Exception(stderr)
- return True
+ # return True
- async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
- """Save the cluster configuration
+ # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
+ # """Save the cluster configuration
- Saves the cluster information to the Mongo database
+ # Saves the cluster information to the Mongo database
- :param cluster_uuid str: The UUID of the cluster
- :param config dict: A dictionary containing the cluster configuration
- """
+ # :param cluster_uuid str: The UUID of the cluster
+ # :param config dict: A dictionary containing the cluster configuration
+ # """
- juju_db = self.db.get_one("admin", {"_id": "juju"})
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
- k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
- self.db.encrypt_decrypt_fields(
- config,
- "encrypt",
- ["secret", "cacert"],
- schema_version="1.1",
- salt=cluster_uuid,
- )
- k8sclusters.append({"_id": cluster_uuid, "config": config})
- self.db.set_one(
- table="admin",
- q_filter={"_id": "juju"},
- update_dict={"k8sclusters": k8sclusters},
- )
+ # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
+ # self.db.encrypt_decrypt_fields(
+ # config,
+ # "encrypt",
+ # ["secret", "cacert"],
+ # schema_version="1.1",
+ # salt=cluster_uuid,
+ # )
+ # k8sclusters.append({"_id": cluster_uuid, "config": config})
+ # self.db.set_one(
+ # table="admin",
+ # q_filter={"_id": "juju"},
+ # update_dict={"k8sclusters": k8sclusters},
+ # )
diff --git a/n2vc/libjuju.py b/n2vc/libjuju.py
index 4702414..aa7afa1 100644
--- a/n2vc/libjuju.py
+++ b/n2vc/libjuju.py
@@ -165,7 +165,8 @@
:param: controller: Controller that will be disconnected
"""
- await controller.disconnect()
+ if controller:
+ await controller.disconnect()
async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
"""
@@ -491,6 +492,28 @@
return machine_id
+ async def deploy(
+ self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+ ):
+ """
+ Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
+
+ :param: uri: Path or Charm Store uri in which the charm or bundle can be found
+ :param: model_name: Model name
+ :param: wait: Indicates whether to wait or not until all applications are active
+ :param: timeout: Time in seconds to wait until all applications are active
+ """
+ controller = await self.get_controller()
+ model = await self.get_model(controller, model_name)
+ try:
+ await model.deploy(uri)
+ if wait:
+ await JujuModelWatcher.wait_for_model(model, timeout=timeout)
+ self.log.debug("All units active in model {}".format(model_name))
+ finally:
+ await self.disconnect_model(model)
+ await self.disconnect_controller(controller)
+
async def deploy_charm(
self,
application_name: str,
@@ -1001,6 +1024,7 @@
:param: interval: Time in seconds between checks
"""
+ controller = None
while True:
try:
controller = await self.get_controller()
@@ -1141,7 +1165,7 @@
auth_type = "certificate"
else:
raise JujuInvalidK8sConfiguration("authentication method not supported")
- return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
+ return client.CloudCredential(auth_type=auth_type, attrs=attrs)
async def add_cloud(
self,
diff --git a/n2vc/tests/unit/test_juju_watcher.py b/n2vc/tests/unit/test_juju_watcher.py
index 593ff0d..41065bf 100644
--- a/n2vc/tests/unit/test_juju_watcher.py
+++ b/n2vc/tests/unit/test_juju_watcher.py
@@ -140,3 +140,29 @@
value = status(application)
mock_derive_status.assert_called_once()
self.assertTrue(isinstance(value, str))
+
+
+class WaitForModelTest(asynctest.TestCase):
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ def setUp(self, mock_connect=None):
+ self.loop = asyncio.new_event_loop()
+ self.model = Model()
+
+ @asynctest.mock.patch("juju.model.Model.block_until")
+ def test_wait_for_model(self, mock_block_until):
+ self.loop.run_until_complete(
+ JujuModelWatcher.wait_for_model(self.model, timeout=None)
+ )
+ mock_block_until.assert_called()
+
+ @asynctest.mock.patch("asyncio.ensure_future")
+ @asynctest.mock.patch("asyncio.wait")
+ def test_wait_for_model_exception(self, mock_wait, mock_ensure_future):
+ task = Mock()
+ mock_ensure_future.return_value = task
+ mock_wait.side_effect = Exception
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ JujuModelWatcher.wait_for_model(self.model, timeout=None)
+ )
+ task.cancel.assert_called()
diff --git a/n2vc/tests/unit/test_k8s_juju_conn.py b/n2vc/tests/unit/test_k8s_juju_conn.py
new file mode 100644
index 0000000..50e827e
--- /dev/null
+++ b/n2vc/tests/unit/test_k8s_juju_conn.py
@@ -0,0 +1,778 @@
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import asynctest
+from n2vc.k8s_juju_conn import K8sJujuConnector
+from osm_common import fslocal
+from .utils import kubeconfig, FakeModel, FakeFileWrapper
+from n2vc.exceptions import (
+ MethodNotImplemented,
+ K8sException,
+ N2VCBadArgumentsException,
+)
+from unittest.mock import Mock
+from .utils import AsyncMock
+
+
+class K8sJujuConnTestCase(asynctest.TestCase):
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.Libjuju")
+ def setUp(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ self.loop = asyncio.get_event_loop()
+ mock_libjuju.return_value = AsyncMock()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ logging.disable(logging.CRITICAL)
+
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+
+class K8sJujuConnInitSuccessTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ logging.disable(logging.CRITICAL)
+
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.libjuju.Libjuju.__init__")
+ def test_success(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ mock_libjuju.return_value = None
+ loop = asyncio.get_event_loop()
+ log = logging.getLogger()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+ K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=log,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+ mock_libjuju.assert_called_once_with(
+ endpoint="1.1.1.1:17070",
+ api_proxy=None, # Not needed for k8s charms
+ enable_os_upgrade=True,
+ apt_mirror=None,
+ username="user",
+ password="secret",
+ cacert=mock_base64_to_cacert.return_value,
+ loop=loop,
+ log=log,
+ db=db,
+ )
+
+
+class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ self.loop = asyncio.get_event_loop()
+ logging.disable(logging.CRITICAL)
+ self.vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ def test_missing_vca_config_host(self):
+ db = Mock()
+ self.vca_config.pop("host")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_user(self):
+ db = Mock()
+ self.vca_config.pop("user")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_secret(self):
+ db = Mock()
+ self.vca_config.pop("secret")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_ca_cert(self):
+ db = Mock()
+ self.vca_config.pop("ca_cert")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+
+class InitEnvTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InitEnvTest, self).setUp()
+ self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_cluster_uuid(
+ self,
+ mock_get_default_storage_class,
+ ):
+ reuse_cluster_uuid = "uuid"
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(
+ k8s_creds=kubeconfig, reuse_cluster_uuid=reuse_cluster_uuid
+ )
+ )
+
+ self.assertTrue(created)
+ self.assertEqual(uuid, reuse_cluster_uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_no_cluster_uuid(self, mock_get_default_storage_class):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertTrue(created)
+ self.assertTrue(isinstance(uuid, str))
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_init_env_exception(self, mock_get_default_storage_class):
+ self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
+ created = None
+ uuid = None
+ with self.assertRaises(Exception):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertIsNone(created)
+ self.assertIsNone(uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+
+class NotImplementedTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(NotImplementedTest, self).setUp()
+
+ def test_repo_add(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_add("", ""))
+
+ def test_repo_list(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_list())
+
+ def test_repo_remove(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_remove(""))
+
+ def test_synchronize_repos(self):
+ self.assertIsNone(
+ self.loop.run_until_complete(self.k8s_juju_conn.synchronize_repos("", ""))
+ )
+
+ def test_upgrade(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.upgrade("", ""))
+
+ def test_rollback(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.rollback("", ""))
+
+ def test_get_namespace(self):
+ self.assertIsNone(self.k8s_juju_conn.get_namespace(""))
+
+ def test_instances_list(self):
+ res = self.loop.run_until_complete(self.k8s_juju_conn.instances_list(""))
+ self.assertEqual(res, [])
+
+
+class ResetTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ResetTest, self).setUp()
+ self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
+
+ def test_success(self):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertTrue(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+ def test_exception(self):
+ removed = None
+ self.k8s_juju_conn.libjuju.remove_cloud.side_effect = Exception()
+ with self.assertRaises(Exception):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertIsNone(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+
+@asynctest.mock.patch("os.chdir")
+class InstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InstallTest, self).setUp()
+ self.db_dict = {"filter": {"_id": "id"}}
+ self.local_bundle = "bundle"
+ self.cs_bundle = "cs:bundle"
+ self.http_bundle = "https://example.com/bundle.yaml"
+ self.kdu_name = "kdu_name"
+ self.cluster_uuid = "cluster"
+ self.k8s_juju_conn.libjuju.add_model = AsyncMock()
+ self.k8s_juju_conn.libjuju.deploy = AsyncMock()
+
+ def test_success_local(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.assertEqual(mock_chdir.call_count, 2)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_cs(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_http(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.http_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.http_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_not_kdu_name(self, mock_chdir):
+ expected_kdu_instance = "id"
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_db_dict(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ @asynctest.mock.patch("os.getcwd")
+ def test_getcwd_exception(self, mock_getcwd, mock_chdir):
+ mock_getcwd.side_effect = FileNotFoundError()
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_bundle(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ "",
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ db_dict=self.db_dict,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ def test_missing_exception(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = None
+ self.k8s_juju_conn.libjuju.deploy.side_effect = Exception()
+ with self.assertRaises(Exception):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+
+class UninstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(UninstallTest, self).setUp()
+ self.k8s_juju_conn.libjuju.destroy_model = AsyncMock()
+
+ def test_success(self):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertTrue(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+ def test_exception(self):
+ destroyed = None
+ self.k8s_juju_conn.libjuju.destroy_model.side_effect = Exception()
+ with self.assertRaises(Exception):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertIsNone(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+
+class ExecPrimitivesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ExecPrimitivesTest, self).setUp()
+ self.action_name = "touch"
+ self.application_name = "myapp"
+ self.model_name = "model"
+ self.k8s_juju_conn.libjuju.get_actions = AsyncMock()
+ self.k8s_juju_conn.libjuju.execute_action = AsyncMock()
+
+ def test_success(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertEqual(output, "success")
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_exception(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.side_effect = Exception()
+ output = None
+
+ with self.assertRaises(Exception):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_missing_application_name_in_params(self):
+ params = {}
+ output = None
+
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_params(self):
+ output = None
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_action(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, "non-existing-action", params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_not_completed(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (None, "failed")
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+
+class InspectKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InspectKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_existing_file(self, mock_exists, mock_open):
+ mock_exists.return_value = True
+ content = """{
+ 'description': 'test bundle',
+ 'bundle': 'kubernetes',
+ 'applications': {'app':{ }, 'app2': { }}
+ }"""
+ mock_open.return_value = FakeFileWrapper(content=content)
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, {"app": {}, "app2": {}})
+ mock_exists.assert_called_once()
+ mock_open.assert_called_once()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_not_existing_file(self, mock_exists, mock_open):
+ kdu = None
+ mock_exists.return_value = False
+ with self.assertRaises(K8sException):
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, None)
+ mock_exists.assert_called_once_with("model")
+ mock_open.assert_not_called()
+
+
+class HelpKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(HelpKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_existing_file(self, mock_listdir, mock_open):
+ content = "Readme file content"
+ mock_open.return_value = FakeFileWrapper(content=content)
+ for file in ["README.md", "README.txt", "README"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, content)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 3)
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_not_existing_file(self, mock_listdir, mock_open):
+ for file in ["src/charm.py", "tox.ini", "requirements.txt"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, None)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 0)
+
+
+class StatusKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(StatusKduTest, self).setUp()
+ self.k8s_juju_conn.libjuju.get_model_status = AsyncMock()
+
+ def test_success(self):
+ applications = {"app": {"status": {"status": "active"}}}
+ model = FakeModel(applications=applications)
+ self.k8s_juju_conn.libjuju.get_model_status.return_value = model
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertEqual(status, {"app": {"status": "active"}})
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.get_model_status.side_effect = Exception()
+ status = None
+ with self.assertRaises(Exception):
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertIsNone(status)
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+
+class GetServicesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServicesTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_services("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetServiceTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServiceTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_service("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetCredentialsTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetCredentialsTest, self).setUp()
+
+ @asynctest.mock.patch("yaml.safe_dump")
+ def test_success(self, mock_safe_dump):
+ self.k8s_juju_conn.db.get_one.return_value = {
+ "_id": "id",
+ "credentials": "credentials",
+ "schema_version": "2",
+ }
+ self.k8s_juju_conn.get_credentials("cluster_uuid")
+ self.k8s_juju_conn.db.get_one.assert_called_once()
+ self.k8s_juju_conn.db.encrypt_decrypt_fields.assert_called_once()
+ mock_safe_dump.assert_called_once()
diff --git a/n2vc/tests/unit/test_libjuju.py b/n2vc/tests/unit/test_libjuju.py
index 76bbebe..454b87f 100644
--- a/n2vc/tests/unit/test_libjuju.py
+++ b/n2vc/tests/unit/test_libjuju.py
@@ -457,6 +457,71 @@
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
+@asynctest.mock.patch("n2vc.juju_watcher.JujuModelWatcher.wait_for_model")
+@asynctest.mock.patch("juju.model.Model.deploy")
+class DeployTest(LibjujuTestCase):
+ def setUp(self):
+ super(DeployTest, self).setUp()
+
+ def test_deploy(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = juju.model.Model()
+ self.loop.run_until_complete(
+ self.libjuju.deploy("cs:osm", "model", wait=True, timeout=0)
+ )
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_called_once()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_no_wait(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_get_model.return_value = juju.model.Model()
+ self.loop.run_until_complete(
+ self.libjuju.deploy("cs:osm", "model", wait=False, timeout=0)
+ )
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_exception(
+ self,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ mock_deploy.side_effect = Exception()
+ mock_get_model.return_value = juju.model.Model()
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(self.libjuju.deploy("cs:osm", "model"))
+ mock_deploy.assert_called_once()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
+@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
@asynctest.mock.patch(
"juju.model.Model.applications", new_callable=asynctest.PropertyMock
)
diff --git a/n2vc/tests/unit/utils.py b/n2vc/tests/unit/utils.py
index d960c70..ac86cdd 100644
--- a/n2vc/tests/unit/utils.py
+++ b/n2vc/tests/unit/utils.py
@@ -19,6 +19,50 @@
from unittest.mock import MagicMock
+kubeconfig = """apiVersion: v1
+clusters:
+- cluster:
+ certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1\
+ JSURBVENDQWVtZ0F3SUJBZ0lKQUxjMk9xVUpwcnVCTUEwR0NTcUdTSWIzRFFFQk\
+ N3VUFNQmN4RlRBVEJnTlYKQkFNTURERXdMakUxTWk0eE9ETXVNVEFlRncweU1EQ\
+ TVNVEV4TkRJeU16VmFGdzB6TURBNU1Ea3hOREl5TXpWYQpNQmN4RlRBVEJnTlZC\
+ QU1NRERFd0xqRTFNaTR4T0RNdU1UQ0NBU0l3RFFZSktvWklodmNOQVFFQkJRQUR\
+ nZ0VQCkFEQ0NBUW9DZ2dFQkFNV0tyQkdxWlJRT0VONDExR2RESmY2ckZWRDcvMU\
+ xHNlZMWjNhd1BRdHBhRTRxdVdyNisKWjExTWwra2kwVEU1cGZFV3dKenVUZXlCU\
+ XVkUEpnYm1QTjF1VWROdGRiNlpocHEzeC9oT0hCMVJLNC9iSlNFUgpiZ0dITmN6\
+ MzR6SHRaZ1dwb2NPTXpPOW9oRUdhMTZUaDhmQWVxYU1CQTJRaklmeUFlaVp3VHJ\
+ nZ3BrY2dBMUlOCjBvQkdqSURnSGVoSU5tbGZOOURkQ3hNN1FNTmtSbzRXdE13bF\
+ JSRWZ4QnFiVkNpZGFjbVhhb1VPUjJPeFVmQWEKN1orSUU1TmN5ZFQ1TGovazdwd\
+ XZCVkdIa0JQWnE0TmlBa3R4aXd5NVB5R29GTk9mT0NrV2I2VnBzVzNhTlNJeAo4\
+ aXBITkc3enV3elc1TGQ5TkhQYWpRckZwdFZBSHpJNWNhRUNBd0VBQWFOUU1FNHd\
+ IUVlEVlIwT0JCWUVGQ1dVCkFaTXNaeE13L1k1OGlXMGZJWVAzcDdTYk1COEdBMV\
+ VkSXdRWU1CYUFGQ1dVQVpNc1p4TXcvWTU4aVcwZklZUDMKcDdTYk1Bd0dBMVVkR\
+ XdRRk1BTUJBZjh3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUJaMlYxMWowRzhh\
+ Z1Z6Twp2YWtKTGt4UGZ0UE1NMFFOaVRzZmV6RzlicnBkdEVLSjFyalFCblNXYTN\
+ WbThWRGZTYkhLQUNXaGh0OEhzcXhtCmNzdVQyOWUyaGZBNHVIOUxMdy9MVG5EdE\
+ tJSjZ6aWFzaTM5RGh3UGwwaExuamJRMjk4VVo5TGovVlpnZGlqemIKWnVPdHlpT\
+ nVOS0E2Nmd0dGxXcWZRQ2hkbnJ5MlZUbjBjblR5dU9UalByYWdOdXJMdlVwL3Nl\
+ eURhZmsxNXJ4egozcmlYZldiQnRhUUk1dnM0ekFKU2xneUg2RnpiZStoTUhlUzF\
+ mM2ppb3dJV0lRR2NNbHpGT1RpMm1xWFRybEJYCnh1WmpLZlpOcndjQVNGbk9qYV\
+ BWeFQ1ODJ4WWhtTm8wR3J2MlZEck51bDlSYkgvK3lNS2J5NEhkOFRvVThMU2kKY\
+ 3Uxajh3cz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
+ server: https://192.168.0.22:16443
+ name: microk8s-cluster
+contexts:
+- context:
+ cluster: microk8s-cluster
+ user: admin
+ name: microk8s
+current-context: microk8s
+kind: Config
+preferences: {}
+users:
+- name: admin
+ user:
+ token: clhkRExRem5Xd1dCdnFEVXdvRGtDRGE5b1F3WnNrZk5qeHFCOU10bHBZRT0K
+"""
+
+
async def AsyncMockFunc():
await asyncio.sleep(1)
@@ -91,6 +135,15 @@
status = "ready"
+class FakeModel:
+ def __init__(self, applications: dict = {}):
+ self._applications = applications
+
+ @property
+ def applications(self):
+ return self._applications
+
+
class FakeUnit(MagicMock):
async def is_leader_from_status(self):
return True
@@ -100,7 +153,6 @@
class FakeApplication(AsyncMock):
-
async def set_config(self, config):
pass
@@ -113,6 +165,25 @@
units = [FakeUnit(), FakeUnit()]
+class FakeFile:
+ def __init__(self, content: str = ""):
+ self.content = content
+
+ def read(self, size: int = -1):
+ return self.content
+
+
+class FakeFileWrapper:
+ def __init__(self, content: str = ""):
+ self.file = FakeFile(content=content)
+
+ def __enter__(self):
+ return self.file
+
+ def __exit__(self, type, value, traceback):
+ pass
+
+
FAKE_DELTA_MACHINE_PENDING = Dict(
{
"deltas": ["machine", "change", {}],