# limitations under the License.
import asyncio
-import concurrent
import os
import uuid
import yaml
+import tempfile
-from juju.controller import Controller
-from juju.model import Model
-from n2vc.exceptions import K8sException
+from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl
-from .exceptions import MethodNotImplemented, N2VCNotFound
+from .exceptions import MethodNotImplemented
+from n2vc.utils import base64_to_cacert
+from n2vc.libjuju import Libjuju
# from juju.bundle import BundleHandler
vca_config: dict = None,
):
"""
-
+ :param fs: file system for kubernetes and helm configuration
+ :param db: Database object
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
- :param fs: file system for kubernetes and helm configuration
:param log: logger
+ :param: loop: Asyncio loop
"""
# parent class
K8sConnector.__init__(
- self, db, log=log, on_update_db=on_update_db,
+ self,
+ db,
+ log=log,
+ on_update_db=on_update_db,
)
self.fs = fs
+ self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
- self.juju_command = juju_command
- self.juju_public_key = None
-
+ required_vca_config = [
+ "host",
+ "user",
+ "secret",
+ "ca_cert",
+ ]
+ if not vca_config or not all(k in vca_config for k in required_vca_config):
+ raise N2VCBadArgumentsException(
+ message="Missing arguments in vca_config: {}".format(vca_config),
+ bad_args=required_vca_config,
+ )
+ port = vca_config["port"] if "port" in vca_config else 17070
+ url = "{}:{}".format(vca_config["host"], port)
+ enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
+ apt_mirror = vca_config.get("apt_mirror", None)
+ username = vca_config["user"]
+ secret = vca_config["secret"]
+ ca_cert = base64_to_cacert(vca_config["ca_cert"])
+
+ self.libjuju = Libjuju(
+ endpoint=url,
+ api_proxy=None, # Not needed for k8s charms
+ enable_os_upgrade=enable_os_upgrade,
+ apt_mirror=apt_mirror,
+ username=username,
+ password=secret,
+ cacert=ca_cert,
+ loop=self.loop,
+ log=self.log,
+ db=self.db,
+ )
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
# self.authenticated = False
(on error, an exception will be raised)
"""
- """Bootstrapping
+ # """Bootstrapping
- Bootstrapping cannot be done, by design, through the API. We need to
- use the CLI tools.
- """
+ # Bootstrapping cannot be done, by design, through the API. We need to
+ # use the CLI tools.
+ # """
- """
- WIP: Workflow
+ # """
+ # WIP: Workflow
- 1. Has the environment already been bootstrapped?
- - Check the database to see if we have a record for this env
+ # 1. Has the environment already been bootstrapped?
+ # - Check the database to see if we have a record for this env
- 2. If this is a new env, create it
- - Add the k8s cloud to Juju
- - Bootstrap
- - Record it in the database
+ # 2. If this is a new env, create it
+ # - Add the k8s cloud to Juju
+ # - Bootstrap
+ # - Record it in the database
- 3. Connect to the Juju controller for this cloud
+ # 3. Connect to the Juju controller for this cloud
- """
+ # """
# cluster_uuid = reuse_cluster_uuid
# if not cluster_uuid:
# cluster_uuid = str(uuid4())
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
# Is a local k8s cluster?
- localk8s = self.is_local_k8s(k8s_creds)
+ # localk8s = self.is_local_k8s(k8s_creds)
# If the k8s is external, the juju controller needs a loadbalancer
- loadbalancer = False if localk8s else True
+ # loadbalancer = False if localk8s else True
# Name the new k8s cloud
- k8s_cloud = "k8s-{}".format(cluster_uuid)
+ # k8s_cloud = "k8s-{}".format(cluster_uuid)
- self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
- await self.add_k8s(k8s_cloud, k8s_creds)
+ # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
+ # await self.add_k8s(k8s_cloud, k8s_creds)
# Bootstrap Juju controller
- self.log.debug("Bootstrapping...")
- await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
- self.log.debug("Bootstrap done.")
+ # self.log.debug("Bootstrapping...")
+ # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
+ # self.log.debug("Bootstrap done.")
# Get the controller information
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
- self.log.debug("Getting controller endpoints")
- with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
- endpoints = controller["api-endpoints"]
- juju_endpoint = endpoints[0]
- juju_ca_cert = controller["ca-cert"]
+ # self.log.debug("Getting controller endpoints")
+ # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
+ # controllers = yaml.load(f, Loader=yaml.Loader)
+ # controller = controllers["controllers"][cluster_uuid]
+ # endpoints = controller["api-endpoints"]
+ # juju_endpoint = endpoints[0]
+ # juju_ca_cert = controller["ca-cert"]
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
- self.log.debug("Getting accounts")
- with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
-
- juju_user = controller["user"]
- juju_secret = controller["password"]
-
- config = {
- "endpoint": juju_endpoint,
- "username": juju_user,
- "secret": juju_secret,
- "cacert": juju_ca_cert,
- "loadbalancer": loadbalancer,
- }
+ # self.log.debug("Getting accounts")
+ # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
+ # controllers = yaml.load(f, Loader=yaml.Loader)
+ # controller = controllers["controllers"][cluster_uuid]
+
+ # juju_user = controller["user"]
+ # juju_secret = controller["password"]
+
+ # config = {
+ # "endpoint": juju_endpoint,
+ # "username": juju_user,
+ # "secret": juju_secret,
+ # "cacert": juju_ca_cert,
+ # "loadbalancer": loadbalancer,
+ # }
# Store the cluster configuration so it
# can be used for subsequent calls
- self.log.debug("Setting config")
- await self.set_config(cluster_uuid, config)
+
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(k8s_creds)
+ kubectl = Kubectl(config_file=kubecfg.name)
+ configuration = kubectl.get_configuration()
+ default_storage_class = kubectl.get_default_storage_class()
+ await self.libjuju.add_k8s(
+ name=cluster_uuid,
+ configuration=configuration,
+ storage_class=default_storage_class,
+ credential_name=self._get_credential_name(cluster_uuid),
+ )
+ # self.log.debug("Setting config")
+ # await self.set_config(cluster_uuid, config)
# Test connection
- controller = await self.get_controller(cluster_uuid)
- await controller.disconnect()
+ # controller = await self.get_controller(cluster_uuid)
+ # await controller.disconnect()
# TODO: Remove these commented lines
# raise Exception("EOL")
"""Repo Management"""
async def repo_add(
- self, name: str, url: str, _type: str = "charm",
+ self,
+ name: str,
+ url: str,
+ _type: str = "charm",
):
raise MethodNotImplemented()
raise MethodNotImplemented()
async def repo_remove(
- self, name: str,
+ self,
+ name: str,
):
raise MethodNotImplemented()
try:
# Remove k8scluster from database
- self.log.debug("[reset] Removing k8scluster from juju database")
- juju_db = self.db.get_one("admin", {"_id": "juju"})
-
- for k in juju_db["k8sclusters"]:
- if k["_id"] == cluster_uuid:
- juju_db["k8sclusters"].remove(k)
- self.db.set_one(
- table="admin",
- q_filter={"_id": "juju"},
- update_dict={"k8sclusters": juju_db["k8sclusters"]},
- )
- break
+ # self.log.debug("[reset] Removing k8scluster from juju database")
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
+
+ # for k in juju_db["k8sclusters"]:
+ # if k["_id"] == cluster_uuid:
+ # juju_db["k8sclusters"].remove(k)
+ # self.db.set_one(
+ # table="admin",
+ # q_filter={"_id": "juju"},
+ # update_dict={"k8sclusters": juju_db["k8sclusters"]},
+ # )
+ # break
# Destroy the controller (via CLI)
- self.log.debug("[reset] Destroying controller")
- await self.destroy_controller(cluster_uuid)
+ # self.log.debug("[reset] Destroying controller")
+ # await self.destroy_controller(cluster_uuid)
self.log.debug("[reset] Removing k8s cloud")
- k8s_cloud = "k8s-{}".format(cluster_uuid)
- await self.remove_cloud(k8s_cloud)
+ # k8s_cloud = "k8s-{}".format(cluster_uuid)
+ # await self.remove_cloud(k8s_cloud)
+ await self.libjuju.remove_cloud(cluster_uuid)
- except Exception as ex:
- self.log.debug("Caught exception during reset: {}".format(ex))
+ except Exception as e:
+ self.log.debug("Caught exception during reset: {}".format(e))
+ raise e
return True
# TODO: Remove these commented lines
# if not self.authenticated:
cluster_uuid: str,
kdu_model: str,
atomic: bool = True,
- timeout: float = 300,
+ timeout: float = 1800,
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
:return: If successful, returns ?
"""
+ bundle = kdu_model
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
##
# Get or create the model, based on the NS
# uuid.
+
+ if not db_dict:
+ raise K8sException("db_dict must be set")
+ if not bundle:
+ raise K8sException("bundle must be set")
+
+ if bundle.startswith("cs:"):
+ pass
+ elif bundle.startswith("http"):
+ # Download the file
+ pass
+ else:
+ new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
+ os.chdir(new_workdir)
+ bundle = "local:{}".format(kdu_model)
+
if kdu_name:
kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
else:
# Create the new model
self.log.debug("Adding model: {}".format(kdu_instance))
- model = await self.add_model(
- kdu_instance, cluster_uuid=cluster_uuid, controller=controller
+ await self.libjuju.add_model(
+ model_name=kdu_instance,
+ cloud_name=cluster_uuid,
+ credential_name=self._get_credential_name(cluster_uuid),
)
- if model:
- # TODO: Instantiation parameters
+ # if model:
+ # TODO: Instantiation parameters
- """
- "Juju bundle that models the KDU, in any of the following ways:
- - <juju-repo>/<juju-bundle>
- - <juju-bundle folder under k8s_models folder in the package>
- - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
- in the package>
- - <URL_where_to_fetch_juju_bundle>
- """
- try:
- previous_workdir = os.getcwd()
- except FileNotFoundError:
- previous_workdir = "/app/storage"
-
- bundle = kdu_model
- if kdu_model.startswith("cs:"):
- bundle = kdu_model
- elif kdu_model.startswith("http"):
- # Download the file
- pass
- else:
- new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
-
- os.chdir(new_workdir)
-
- bundle = "local:{}".format(kdu_model)
-
- if not bundle:
- # Raise named exception that the bundle could not be found
- raise Exception()
-
- self.log.debug("[install] deploying {}".format(bundle))
- await model.deploy(bundle)
-
- # Get the application
- if atomic:
- # applications = model.applications
- self.log.debug("[install] Applications: {}".format(model.applications))
- for name in model.applications:
- self.log.debug("[install] Waiting for {} to settle".format(name))
- application = model.applications[name]
- try:
- # It's not enough to wait for all units to be active;
- # the application status needs to be active as well.
- self.log.debug("Waiting for all units to be active...")
- await model.block_until(
- lambda: all(
- unit.agent_status == "idle"
- and application.status in ["active", "unknown"]
- and unit.workload_status in ["active", "unknown"]
- for unit in application.units
- ),
- timeout=timeout,
- )
- self.log.debug("All units active.")
-
- # TODO use asyncio.TimeoutError
- except concurrent.futures._base.TimeoutError:
- os.chdir(previous_workdir)
- self.log.debug("[install] Timeout exceeded; resetting cluster")
- await self.reset(cluster_uuid)
- return False
-
- # Wait for the application to be active
- if model.is_connected():
- self.log.debug("[install] Disconnecting model")
- await model.disconnect()
- await controller.disconnect()
- os.chdir(previous_workdir)
-
- return kdu_instance
- raise Exception("Unable to install")
+ """
+ "Juju bundle that models the KDU, in any of the following ways:
+ - <juju-repo>/<juju-bundle>
+ - <juju-bundle folder under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+ in the package>
+ - <URL_where_to_fetch_juju_bundle>
+ """
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
+
+ self.log.debug("[install] deploying {}".format(bundle))
+ await self.libjuju.deploy(
+ bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
+ )
+
+ # Get the application
+ # if atomic:
+ # # applications = model.applications
+ # self.log.debug("[install] Applications: {}".format(model.applications))
+ # for name in model.applications:
+ # self.log.debug("[install] Waiting for {} to settle".format(name))
+ # application = model.applications[name]
+ # try:
+ # # It's not enough to wait for all units to be active;
+ # # the application status needs to be active as well.
+ # self.log.debug("Waiting for all units to be active...")
+ # await model.block_until(
+ # lambda: all(
+ # unit.agent_status == "idle"
+ # and application.status in ["active", "unknown"]
+ # and unit.workload_status in ["active", "unknown"]
+ # for unit in application.units
+ # ),
+ # timeout=timeout,
+ # )
+ # self.log.debug("All units active.")
+
+ # # TODO use asyncio.TimeoutError
+ # except concurrent.futures._base.TimeoutError:
+ # os.chdir(previous_workdir)
+ # self.log.debug("[install] Timeout exceeded; resetting cluster")
+ # await self.reset(cluster_uuid)
+ # return False
+
+ # Wait for the application to be active
+ # if model.is_connected():
+ # self.log.debug("[install] Disconnecting model")
+ # await model.disconnect()
+ # await controller.disconnect()
+ os.chdir(previous_workdir)
+
+ return kdu_instance
async def instances_list(self, cluster_uuid: str) -> list:
"""
"""Rollback"""
async def rollback(
- self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ revision: int = 0,
) -> str:
"""Rollback a model
:return: Returns True if successful, or raises an exception
"""
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
self.log.debug("[uninstall] Destroying model")
- await controller.destroy_models(kdu_instance)
+ await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
- self.log.debug("[uninstall] Model destroyed and disconnecting")
- await controller.disconnect()
+ # self.log.debug("[uninstall] Model destroyed and disconnecting")
+ # await controller.disconnect()
return True
# TODO: Remove these commented lines
:return: Returns the output of the action
"""
- controller = await self.get_controller(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
if not params or "application-name" not in params:
raise K8sException(
"[exec_primitive] Getting model "
"kdu_instance: {}".format(kdu_instance)
)
-
- model = await self.get_model(kdu_instance, controller=controller)
-
application_name = params["application-name"]
- application = model.applications[application_name]
-
- actions = await application.get_actions()
+ actions = await self.libjuju.get_actions(application_name, kdu_instance)
if primitive_name not in actions:
raise K8sException("Primitive {} not found".format(primitive_name))
+ output, status = await self.libjuju.execute_action(
+ application_name, kdu_instance, primitive_name, **params
+ )
+ # model = await self.get_model(kdu_instance, controller=controller)
- unit = None
- for u in application.units:
- if await u.is_leader_from_status():
- unit = u
- break
+ # application_name = params["application-name"]
+ # application = model.applications[application_name]
- if unit is None:
- raise K8sException("No leader unit found to execute action")
+ # actions = await application.get_actions()
+ # if primitive_name not in actions:
+ # raise K8sException("Primitive {} not found".format(primitive_name))
- self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
- action = await unit.run_action(primitive_name, **params)
+ # unit = None
+ # for u in application.units:
+ # if await u.is_leader_from_status():
+ # unit = u
+ # break
- output = await model.get_action_output(action_uuid=action.entity_id)
- status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+ # if unit is None:
+ # raise K8sException("No leader unit found to execute action")
- status = (
- status[action.entity_id] if action.entity_id in status else "failed"
- )
+ # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+ # action = await unit.run_action(primitive_name, **params)
+
+ # output = await model.get_action_output(action_uuid=action.entity_id)
+ # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+ # status = (
+ # status[action.entity_id] if action.entity_id in status else "failed"
+ # )
if status != "completed":
raise K8sException(
error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
self.log.error(error_msg)
raise K8sException(message=error_msg)
- finally:
- await controller.disconnect()
+ # finally:
+ # await controller.disconnect()
# TODO: Remove these commented lines:
# if not self.authenticated:
# self.log.debug("[exec_primitive] Connecting to controller")
"""Introspection"""
- async def inspect_kdu(self, kdu_model: str,) -> dict:
+ async def inspect_kdu(
+ self,
+ kdu_model: str,
+ ) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
"""
kdu = {}
+ if not os.path.exists(kdu_model):
+ raise K8sException("file {} not found".format(kdu_model))
+
with open(kdu_model, "r") as f:
- bundle = yaml.safe_load(f)
+ bundle = yaml.safe_load(f.read())
"""
{
return kdu
- async def help_kdu(self, kdu_model: str,) -> str:
+ async def help_kdu(
+ self,
+ kdu_model: str,
+ ) -> str:
"""View the README
If available, returns the README of the bundle.
return readme
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
+ async def status_kdu(
+ self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ ) -> dict:
"""Get the status of the KDU
Get the current status of the KDU instance.
and deployment_time.
"""
status = {}
- controller = await self.get_controller(cluster_uuid)
- model = await self.get_model(kdu_instance, controller=controller)
-
- model_status = await model.get_status()
- status = model_status.applications
+ # controller = await self.get_controller(cluster_uuid)
+ # model = await self.get_model(kdu_instance, controller=controller)
+ # model_status = await model.get_status()
+ # status = model_status.applications
+ model_status = await self.libjuju.get_model_status(kdu_instance)
for name in model_status.applications:
application = model_status.applications[name]
status[name] = {"status": application["status"]["status"]}
- await model.disconnect()
- await controller.disconnect()
+ # await model.disconnect()
+ # await controller.disconnect()
return status
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- config_path = "/tmp/{}".format(cluster_uuid)
- config_file = "{}/config".format(config_path)
+ # config_path = "/tmp/{}".format(cluster_uuid)
+ # config_file = "{}/config".format(config_path)
+
+ # if not os.path.exists(config_path):
+ # os.makedirs(config_path)
+ # with open(config_file, "w") as f:
+ # f.write(credentials)
- if not os.path.exists(config_path):
- os.makedirs(config_path)
- with open(config_file, "w") as f:
- f.write(credentials)
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ kubectl = Kubectl(config_file=kubecfg.name)
- kubectl = Kubectl(config_file=config_file)
return kubectl.get_services(
field_selector="metadata.namespace={}".format(kdu_instance)
)
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- config_path = "/tmp/{}".format(cluster_uuid)
- config_file = "{}/config".format(config_path)
+ # config_path = "/tmp/{}".format(cluster_uuid)
+ # config_file = "{}/config".format(config_path)
- if not os.path.exists(config_path):
- os.makedirs(config_path)
- with open(config_file, "w") as f:
- f.write(credentials)
+ # if not os.path.exists(config_path):
+ # os.makedirs(config_path)
+ # with open(config_file, "w") as f:
+ # f.write(credentials)
- kubectl = Kubectl(config_file=config_file)
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ kubectl = Kubectl(config_file=kubecfg.name)
return kubectl.get_services(
field_selector="metadata.name={},metadata.namespace={}".format(
)[0]
# Private methods
- async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
- """Add a k8s cloud to Juju
+ # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
+ # """Add a k8s cloud to Juju
- Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
- Juju Controller.
+ # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
+ # Juju Controller.
- :param cloud_name str: The name of the cloud to add.
- :param credentials dict: A dictionary representing the output of
- `kubectl config view --raw`.
+ # :param cloud_name str: The name of the cloud to add.
+ # :param credentials dict: A dictionary representing the output of
+ # `kubectl config view --raw`.
- :returns: True if successful, otherwise raises an exception.
- """
-
- cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
- self.log.debug(cmd)
+ # :returns: True if successful, otherwise raises an exception.
+ # """
- process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- stdin=asyncio.subprocess.PIPE,
- )
+ # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
+ # self.log.debug(cmd)
- # Feed the process the credentials
- process.stdin.write(credentials.encode("utf-8"))
- await process.stdin.drain()
- process.stdin.close()
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd,
+ # stdout=asyncio.subprocess.PIPE,
+ # stderr=asyncio.subprocess.PIPE,
+ # stdin=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # # Feed the process the credentials
+ # process.stdin.write(credentials.encode("utf-8"))
+ # await process.stdin.drain()
+ # process.stdin.close()
- return_code = process.returncode
+ # _stdout, stderr = await process.communicate()
- self.log.debug("add-k8s return code: {}".format(return_code))
+ # return_code = process.returncode
- if return_code > 0:
- raise Exception(stderr)
+ # self.log.debug("add-k8s return code: {}".format(return_code))
- return True
+ # if return_code > 0:
+ # raise Exception(stderr)
- async def add_model(
- self, model_name: str, cluster_uuid: str, controller: Controller
- ) -> Model:
- """Adds a model to the controller
+ # return True
- Adds a new model to the Juju controller
+ # async def add_model(
+ # self, model_name: str, cluster_uuid: str, controller: Controller
+ # ) -> Model:
+ # """Adds a model to the controller
- :param model_name str: The name of the model to add.
- :param cluster_uuid str: ID of the cluster.
- :param controller: Controller object in which the model will be added
- :returns: The juju.model.Model object of the new model upon success or
- raises an exception.
- """
+ # Adds a new model to the Juju controller
- self.log.debug(
- "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
- )
- model = None
- try:
- if self.juju_public_key is not None:
- model = await controller.add_model(
- model_name, config={"authorized-keys": self.juju_public_key}
- )
- else:
- model = await controller.add_model(model_name)
- except Exception as ex:
- self.log.debug(ex)
- self.log.debug("Caught exception: {}".format(ex))
- pass
-
- return model
-
- async def bootstrap(
- self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
- ) -> bool:
- """Bootstrap a Kubernetes controller
+ # :param model_name str: The name of the model to add.
+ # :param cluster_uuid str: ID of the cluster.
+ # :param controller: Controller object in which the model will be added
+ # :returns: The juju.model.Model object of the new model upon success or
+ # raises an exception.
+ # """
- Bootstrap a Juju controller inside the Kubernetes cluster
+ # self.log.debug(
+ # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
+ # )
+ # model = None
+ # try:
+ # if self.juju_public_key is not None:
+ # model = await controller.add_model(
+ # model_name, config={"authorized-keys": self.juju_public_key}
+ # )
+ # else:
+ # model = await controller.add_model(model_name)
+ # except Exception as ex:
+ # self.log.debug(ex)
+ # self.log.debug("Caught exception: {}".format(ex))
+ # pass
+
+ # return model
+
+ # async def bootstrap(
+ # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
+ # ) -> bool:
+ # """Bootstrap a Kubernetes controller
+
+ # Bootstrap a Juju controller inside the Kubernetes cluster
+
+ # :param cloud_name str: The name of the cloud.
+ # :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ # :param loadbalancer bool: If the controller should use loadbalancer or not.
+ # :returns: True upon success or raises an exception.
+ # """
- :param cloud_name str: The name of the cloud.
- :param cluster_uuid str: The UUID of the cluster to bootstrap.
- :param loadbalancer bool: If the controller should use loadbalancer or not.
- :returns: True upon success or raises an exception.
- """
+ # if not loadbalancer:
+ # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
+ # else:
+ # """
+ # For public clusters, specify that the controller service is using a
+ # LoadBalancer.
+ # """
+ # cmd = [
+ # self.juju_command,
+ # "bootstrap",
+ # cloud_name,
+ # cluster_uuid,
+ # "--config",
+ # "controller-service-type=loadbalancer",
+ # ]
- if not loadbalancer:
- cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
- else:
- """
- For public clusters, specify that the controller service is using a
- LoadBalancer.
- """
- cmd = [
- self.juju_command,
- "bootstrap",
- cloud_name,
- cluster_uuid,
- "--config",
- "controller-service-type=loadbalancer",
- ]
-
- self.log.debug(
- "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
- )
+ # self.log.debug(
+ # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+ # )
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- #
- if b"already exists" not in stderr:
- raise Exception(stderr)
+ # if return_code > 0:
+ # #
+ # if b"already exists" not in stderr:
+ # raise Exception(stderr)
- return True
+ # return True
- async def destroy_controller(self, cluster_uuid: str) -> bool:
- """Destroy a Kubernetes controller
+ # async def destroy_controller(self, cluster_uuid: str) -> bool:
+ # """Destroy a Kubernetes controller
- Destroy an existing Kubernetes controller.
+ # Destroy an existing Kubernetes controller.
- :param cluster_uuid str: The UUID of the cluster to bootstrap.
- :returns: True upon success or raises an exception.
- """
- cmd = [
- self.juju_command,
- "destroy-controller",
- "--destroy-all-models",
- "--destroy-storage",
- "-y",
- cluster_uuid,
- ]
+ # :param cluster_uuid str: The UUID of the cluster to bootstrap.
+ # :returns: True upon success or raises an exception.
+ # """
+ # cmd = [
+ # self.juju_command,
+ # "destroy-controller",
+ # "--destroy-all-models",
+ # "--destroy-storage",
+ # "-y",
+ # cluster_uuid,
+ # ]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- #
- if "already exists" not in stderr:
- raise Exception(stderr)
+ # if return_code > 0:
+ # #
+ # if "already exists" not in stderr:
+ # raise Exception(stderr)
def get_credentials(self, cluster_uuid: str) -> str:
"""
return yaml.safe_dump(k8scluster.get("credentials"))
- def get_config(self, cluster_uuid: str,) -> dict:
- """Get the cluster configuration
+ def _get_credential_name(self, cluster_uuid: str) -> str:
+ """
+ Get credential name for a k8s cloud
- Gets the configuration of the cluster
+ We cannot use the cluster_uuid for the credential name directly,
+ because it cannot start with a number, it must start with a letter.
+ Therefore, the k8s cloud credential name will be "cred-" followed
+ by the cluster uuid.
- :param cluster_uuid str: The UUID of the cluster.
- :return: A dict upon success, or raises an exception.
+ :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
+
+ :return: Name to use for the credential name.
"""
+ return "cred-{}".format(cluster_uuid)
- juju_db = self.db.get_one("admin", {"_id": "juju"})
- config = None
- for k in juju_db["k8sclusters"]:
- if k["_id"] == cluster_uuid:
- config = k["config"]
- self.db.encrypt_decrypt_fields(
- config,
- "decrypt",
- ["secret", "cacert"],
- schema_version="1.1",
- salt=k["_id"],
- )
- break
- if not config:
- raise Exception(
- "Unable to locate configuration for cluster {}".format(cluster_uuid)
- )
- return config
+ # def get_config(self, cluster_uuid: str,) -> dict:
+ # """Get the cluster configuration
- async def get_model(self, model_name: str, controller: Controller) -> Model:
- """Get a model from the Juju Controller.
+ # Gets the configuration of the cluster
- Note: Model objects returned must call disconnected() before it goes
- out of scope.
+ # :param cluster_uuid str: The UUID of the cluster.
+ # :return: A dict upon success, or raises an exception.
+ # """
- :param model_name str: The name of the model to get
- :param controller Controller: Controller object
- :return The juju.model.Model object if found, or None.
- """
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
+ # config = None
+ # for k in juju_db["k8sclusters"]:
+ # if k["_id"] == cluster_uuid:
+ # config = k["config"]
+ # self.db.encrypt_decrypt_fields(
+ # config,
+ # "decrypt",
+ # ["secret", "cacert"],
+ # schema_version="1.1",
+ # salt=k["_id"],
+ # )
+ # break
+ # if not config:
+ # raise Exception(
+ # "Unable to locate configuration for cluster {}".format(cluster_uuid)
+ # )
+ # return config
+
+ # async def get_model(self, model_name: str, controller: Controller) -> Model:
+ # """Get a model from the Juju Controller.
+
+ # Note: Model objects returned must call disconnected() before it goes
+ # out of scope.
+
+ # :param model_name str: The name of the model to get
+ # :param controller Controller: Controller object
+ # :return The juju.model.Model object if found, or None.
+ # """
- models = await controller.list_models()
- if model_name not in models:
- raise N2VCNotFound("Model {} not found".format(model_name))
- self.log.debug("Found model: {}".format(model_name))
- return await controller.get_model(model_name)
+ # models = await controller.list_models()
+ # if model_name not in models:
+ # raise N2VCNotFound("Model {} not found".format(model_name))
+ # self.log.debug("Found model: {}".format(model_name))
+ # return await controller.get_model(model_name)
- def get_namespace(self, cluster_uuid: str,) -> str:
+ def get_namespace(
+ self,
+ cluster_uuid: str,
+ ) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
:param cluster_uuid str: The UUID of the cluster
:returns: The namespace UUID, or raises an exception
"""
- config = self.get_config(cluster_uuid)
+ # config = self.get_config(cluster_uuid)
# Make sure the name is in the config
- if "namespace" not in config:
- raise Exception("Namespace not found.")
+ # if "namespace" not in config:
+ # raise Exception("Namespace not found.")
# TODO: We want to make sure this is unique to the cluster, in case
# the cluster is being reused.
# Consider pre/appending the cluster id to the namespace string
- return config["namespace"]
+ pass
# TODO: Remove these lines of code
# async def has_model(self, model_name: str) -> bool:
# return True
# return False
- def is_local_k8s(self, credentials: str,) -> bool:
- """Check if a cluster is local
+ # def is_local_k8s(self, credentials: str,) -> bool:
+ # """Check if a cluster is local
- Checks if a cluster is running in the local host
-
- :param credentials dict: A dictionary containing the k8s credentials
- :returns: A boolean if the cluster is running locally
- """
+ # Checks if a cluster is running in the local host
- creds = yaml.safe_load(credentials)
-
- if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
- for cluster in creds["clusters"]:
- if "server" in cluster["cluster"]:
- if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
- return True
-
- return False
-
- async def get_controller(self, cluster_uuid):
- """Login to the Juju controller."""
+ # :param credentials dict: A dictionary containing the k8s credentials
+ # :returns: A boolean if the cluster is running locally
+ # """
- config = self.get_config(cluster_uuid)
+ # creds = yaml.safe_load(credentials)
- juju_endpoint = config["endpoint"]
- juju_user = config["username"]
- juju_secret = config["secret"]
- juju_ca_cert = config["cacert"]
+ # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
+ # for cluster in creds["clusters"]:
+ # if "server" in cluster["cluster"]:
+ # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
+ # return True
- controller = Controller()
+ # return False
- if juju_secret:
- self.log.debug(
- "Connecting to controller... ws://{} as {}".format(
- juju_endpoint, juju_user,
- )
- )
- try:
- await controller.connect(
- endpoint=juju_endpoint,
- username=juju_user,
- password=juju_secret,
- cacert=juju_ca_cert,
- )
- self.log.debug("JujuApi: Logged into controller")
- return controller
- except Exception as ex:
- self.log.debug(ex)
- self.log.debug("Caught exception: {}".format(ex))
- else:
- self.log.fatal("VCA credentials not configured.")
+ # async def get_controller(self, cluster_uuid):
+ # """Login to the Juju controller."""
+
+ # config = self.get_config(cluster_uuid)
+
+ # juju_endpoint = config["endpoint"]
+ # juju_user = config["username"]
+ # juju_secret = config["secret"]
+ # juju_ca_cert = config["cacert"]
+
+ # controller = Controller()
+
+ # if juju_secret:
+ # self.log.debug(
+ # "Connecting to controller... ws://{} as {}".format(
+ # juju_endpoint, juju_user,
+ # )
+ # )
+ # try:
+ # await controller.connect(
+ # endpoint=juju_endpoint,
+ # username=juju_user,
+ # password=juju_secret,
+ # cacert=juju_ca_cert,
+ # )
+ # self.log.debug("JujuApi: Logged into controller")
+ # return controller
+ # except Exception as ex:
+ # self.log.debug(ex)
+ # self.log.debug("Caught exception: {}".format(ex))
+ # else:
+ # self.log.fatal("VCA credentials not configured.")
# TODO: Remove these commented lines
# self.authenticated = False
# self.authenticated = False
- async def remove_cloud(self, cloud_name: str,) -> bool:
- """Remove a k8s cloud from Juju
+ # async def remove_cloud(self, cloud_name: str,) -> bool:
+ # """Remove a k8s cloud from Juju
- Removes a Kubernetes cloud from Juju.
+ # Removes a Kubernetes cloud from Juju.
- :param cloud_name str: The name of the cloud to add.
+ # :param cloud_name str: The name of the cloud to add.
- :returns: True if successful, otherwise raises an exception.
- """
+ # :returns: True if successful, otherwise raises an exception.
+ # """
- # Remove the bootstrapped controller
- cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # # Remove the bootstrapped controller
+ # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- raise Exception(stderr)
+ # if return_code > 0:
+ # raise Exception(stderr)
- # Remove the cloud from the local config
- cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
- process = await asyncio.create_subprocess_exec(
- *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
- )
+ # # Remove the cloud from the local config
+ # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
+ # process = await asyncio.create_subprocess_exec(
+ # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
+ # )
- _stdout, stderr = await process.communicate()
+ # _stdout, stderr = await process.communicate()
- return_code = process.returncode
+ # return_code = process.returncode
- if return_code > 0:
- raise Exception(stderr)
+ # if return_code > 0:
+ # raise Exception(stderr)
- return True
+ # return True
- async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
- """Save the cluster configuration
+ # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
+ # """Save the cluster configuration
- Saves the cluster information to the Mongo database
+ # Saves the cluster information to the Mongo database
- :param cluster_uuid str: The UUID of the cluster
- :param config dict: A dictionary containing the cluster configuration
- """
-
- juju_db = self.db.get_one("admin", {"_id": "juju"})
+ # :param cluster_uuid str: The UUID of the cluster
+ # :param config dict: A dictionary containing the cluster configuration
+ # """
- k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
- self.db.encrypt_decrypt_fields(
- config,
- "encrypt",
- ["secret", "cacert"],
- schema_version="1.1",
- salt=cluster_uuid,
- )
- k8sclusters.append({"_id": cluster_uuid, "config": config})
- self.db.set_one(
- table="admin",
- q_filter={"_id": "juju"},
- update_dict={"k8sclusters": k8sclusters},
- )
+ # juju_db = self.db.get_one("admin", {"_id": "juju"})
+
+ # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
+ # self.db.encrypt_decrypt_fields(
+ # config,
+ # "encrypt",
+ # ["secret", "cacert"],
+ # schema_version="1.1",
+ # salt=cluster_uuid,
+ # )
+ # k8sclusters.append({"_id": cluster_uuid, "config": config})
+ # self.db.set_one(
+ # table="admin",
+ # q_filter={"_id": "juju"},
+ # update_dict={"k8sclusters": k8sclusters},
+ # )
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import asynctest
+from n2vc.k8s_juju_conn import K8sJujuConnector
+from osm_common import fslocal
+from .utils import kubeconfig, FakeModel, FakeFileWrapper
+from n2vc.exceptions import (
+ MethodNotImplemented,
+ K8sException,
+ N2VCBadArgumentsException,
+)
+from unittest.mock import Mock
+from .utils import AsyncMock
+
+
+class K8sJujuConnTestCase(asynctest.TestCase):
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.Libjuju")
+ def setUp(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ self.loop = asyncio.get_event_loop()
+ mock_libjuju.return_value = AsyncMock()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ logging.disable(logging.CRITICAL)
+
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+
+class K8sJujuConnInitSuccessTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ logging.disable(logging.CRITICAL)
+
+ @asynctest.mock.patch("juju.controller.Controller.update_endpoints")
+ @asynctest.mock.patch("juju.client.connector.Connector.connect")
+ @asynctest.mock.patch("juju.controller.Controller.connection")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.base64_to_cacert")
+ @asynctest.mock.patch("n2vc.libjuju.Libjuju.__init__")
+ def test_success(
+ self,
+ mock_libjuju=None,
+ mock_base64_to_cacert=None,
+ mock_connection=None,
+ mock_connect=None,
+ mock_update_endpoints=None,
+ ):
+ mock_libjuju.return_value = None
+ loop = asyncio.get_event_loop()
+ log = logging.getLogger()
+ db = Mock()
+ vca_config = {
+ "secret": "secret",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+ K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=log,
+ loop=self.loop,
+ vca_config=vca_config,
+ on_update_db=None,
+ )
+
+ mock_libjuju.assert_called_once_with(
+ endpoint="1.1.1.1:17070",
+ api_proxy=None, # Not needed for k8s charms
+ enable_os_upgrade=True,
+ apt_mirror=None,
+ username="user",
+ password="secret",
+ cacert=mock_base64_to_cacert.return_value,
+ loop=loop,
+ log=log,
+ db=db,
+ )
+
+
+class K8sJujuConnectorInitFailureTestCase(asynctest.TestCase):
+ def setUp(
+ self,
+ ):
+ self.loop = asyncio.get_event_loop()
+ logging.disable(logging.CRITICAL)
+ self.vca_config = {
+ "secret": "secret",
+ "api_proxy": "api_proxy",
+ "cloud": "cloud",
+ "k8s_cloud": "k8s_cloud",
+ "user": "user",
+ "host": "1.1.1.1",
+ "port": 17070,
+ "ca_cert": "cacert",
+ }
+
+ def test_missing_vca_config_host(self):
+ db = Mock()
+ self.vca_config.pop("host")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_user(self):
+ db = Mock()
+ self.vca_config.pop("user")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_secret(self):
+ db = Mock()
+ self.vca_config.pop("secret")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+ def test_missing_vca_config_ca_cert(self):
+ db = Mock()
+ self.vca_config.pop("ca_cert")
+ with self.assertRaises(N2VCBadArgumentsException):
+ self.k8s_juju_conn = K8sJujuConnector(
+ fs=fslocal.FsLocal(),
+ db=db,
+ log=None,
+ loop=self.loop,
+ vca_config=self.vca_config,
+ on_update_db=None,
+ )
+
+
+class InitEnvTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InitEnvTest, self).setUp()
+ self.k8s_juju_conn.libjuju.add_k8s = AsyncMock()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_cluster_uuid(
+ self,
+ mock_get_default_storage_class,
+ ):
+ reuse_cluster_uuid = "uuid"
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(
+ k8s_creds=kubeconfig, reuse_cluster_uuid=reuse_cluster_uuid
+ )
+ )
+
+ self.assertTrue(created)
+ self.assertEqual(uuid, reuse_cluster_uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_with_no_cluster_uuid(self, mock_get_default_storage_class):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertTrue(created)
+ self.assertTrue(isinstance(uuid, str))
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_default_storage_class")
+ def test_init_env_exception(self, mock_get_default_storage_class):
+ self.k8s_juju_conn.libjuju.add_k8s.side_effect = Exception()
+ created = None
+ uuid = None
+ with self.assertRaises(Exception):
+ uuid, created = self.loop.run_until_complete(
+ self.k8s_juju_conn.init_env(k8s_creds=kubeconfig)
+ )
+
+ self.assertIsNone(created)
+ self.assertIsNone(uuid)
+ mock_get_default_storage_class.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_k8s.assert_called_once()
+
+
+class NotImplementedTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(NotImplementedTest, self).setUp()
+
+ def test_repo_add(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_add("", ""))
+
+ def test_repo_list(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_list())
+
+ def test_repo_remove(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.repo_remove(""))
+
+ def test_synchronize_repos(self):
+ self.assertIsNone(
+ self.loop.run_until_complete(self.k8s_juju_conn.synchronize_repos("", ""))
+ )
+
+ def test_upgrade(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.upgrade("", ""))
+
+ def test_rollback(self):
+ with self.assertRaises(MethodNotImplemented):
+ self.loop.run_until_complete(self.k8s_juju_conn.rollback("", ""))
+
+ def test_get_namespace(self):
+ self.assertIsNone(self.k8s_juju_conn.get_namespace(""))
+
+ def test_instances_list(self):
+ res = self.loop.run_until_complete(self.k8s_juju_conn.instances_list(""))
+ self.assertEqual(res, [])
+
+
+class ResetTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ResetTest, self).setUp()
+ self.k8s_juju_conn.libjuju.remove_cloud = AsyncMock()
+
+ def test_success(self):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertTrue(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+ def test_exception(self):
+ removed = None
+ self.k8s_juju_conn.libjuju.remove_cloud.side_effect = Exception()
+ with self.assertRaises(Exception):
+ removed = self.loop.run_until_complete(self.k8s_juju_conn.reset("uuid"))
+ self.assertIsNone(removed)
+ self.k8s_juju_conn.libjuju.remove_cloud.assert_called_once()
+
+
+@asynctest.mock.patch("os.chdir")
+class InstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InstallTest, self).setUp()
+ self.db_dict = {"filter": {"_id": "id"}}
+ self.local_bundle = "bundle"
+ self.cs_bundle = "cs:bundle"
+ self.http_bundle = "https://example.com/bundle.yaml"
+ self.kdu_name = "kdu_name"
+ self.cluster_uuid = "cluster"
+ self.k8s_juju_conn.libjuju.add_model = AsyncMock()
+ self.k8s_juju_conn.libjuju.deploy = AsyncMock()
+
+ def test_success_local(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.assertEqual(mock_chdir.call_count, 2)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_cs(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_http(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.http_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.http_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_success_not_kdu_name(self, mock_chdir):
+ expected_kdu_instance = "id"
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_db_dict(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ @asynctest.mock.patch("os.getcwd")
+ def test_getcwd_exception(self, mock_getcwd, mock_chdir):
+ mock_getcwd.side_effect = FileNotFoundError()
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.cs_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertEqual(kdu_instance, expected_kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ self.cs_bundle,
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+ def test_missing_bundle(self, mock_chdir):
+ kdu_instance = None
+ with self.assertRaises(K8sException):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ "",
+ atomic=True,
+ kdu_name=self.kdu_name,
+ timeout=1800,
+ db_dict=self.db_dict,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_not_called()
+ self.k8s_juju_conn.libjuju.deploy.assert_not_called()
+
+ def test_missing_exception(self, mock_chdir):
+ expected_kdu_instance = "{}-{}".format(self.kdu_name, "id")
+ kdu_instance = None
+ self.k8s_juju_conn.libjuju.deploy.side_effect = Exception()
+ with self.assertRaises(Exception):
+ kdu_instance = self.loop.run_until_complete(
+ self.k8s_juju_conn.install(
+ self.cluster_uuid,
+ self.local_bundle,
+ atomic=True,
+ kdu_name=self.kdu_name,
+ db_dict=self.db_dict,
+ timeout=1800,
+ )
+ )
+ self.assertIsNone(kdu_instance)
+ self.k8s_juju_conn.libjuju.add_model.assert_called_once_with(
+ model_name=expected_kdu_instance,
+ cloud_name=self.cluster_uuid,
+ credential_name="cred-{}".format(self.cluster_uuid),
+ )
+ self.k8s_juju_conn.libjuju.deploy.assert_called_once_with(
+ "local:{}".format(self.local_bundle),
+ model_name=expected_kdu_instance,
+ wait=True,
+ timeout=1800,
+ )
+
+
+class UninstallTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(UninstallTest, self).setUp()
+ self.k8s_juju_conn.libjuju.destroy_model = AsyncMock()
+
+ def test_success(self):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertTrue(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+ def test_exception(self):
+ destroyed = None
+ self.k8s_juju_conn.libjuju.destroy_model.side_effect = Exception()
+ with self.assertRaises(Exception):
+ destroyed = self.loop.run_until_complete(
+ self.k8s_juju_conn.uninstall("cluster_uuid", "model_name")
+ )
+ self.assertIsNone(destroyed)
+ self.k8s_juju_conn.libjuju.destroy_model.assert_called_once()
+
+
+class ExecPrimitivesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(ExecPrimitivesTest, self).setUp()
+ self.action_name = "touch"
+ self.application_name = "myapp"
+ self.model_name = "model"
+ self.k8s_juju_conn.libjuju.get_actions = AsyncMock()
+ self.k8s_juju_conn.libjuju.execute_action = AsyncMock()
+
+ def test_success(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertEqual(output, "success")
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_exception(self):
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.side_effect = Exception()
+ output = None
+
+ with self.assertRaises(Exception):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+ def test_missing_application_name_in_params(self):
+ params = {}
+ output = None
+
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_params(self):
+ output = None
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_not_called()
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_action(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (
+ "success",
+ "completed",
+ )
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, "non-existing-action", params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_not_called()
+
+ def test_missing_not_completed(self):
+ output = None
+ params = {"application-name": self.application_name}
+ self.k8s_juju_conn.libjuju.get_actions.return_value = [self.action_name]
+ self.k8s_juju_conn.libjuju.execute_action.return_value = (None, "failed")
+ with self.assertRaises(K8sException):
+ output = self.loop.run_until_complete(
+ self.k8s_juju_conn.exec_primitive(
+ "cluster", self.model_name, self.action_name, params=params
+ )
+ )
+
+ self.assertIsNone(output)
+ self.k8s_juju_conn.libjuju.get_actions.assert_called_once_with(
+ self.application_name, self.model_name
+ )
+ self.k8s_juju_conn.libjuju.execute_action.assert_called_once_with(
+ self.application_name, self.model_name, self.action_name, **params
+ )
+
+
+class InspectKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(InspectKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_existing_file(self, mock_exists, mock_open):
+ mock_exists.return_value = True
+ content = """{
+ 'description': 'test bundle',
+ 'bundle': 'kubernetes',
+ 'applications': {'app':{ }, 'app2': { }}
+ }"""
+ mock_open.return_value = FakeFileWrapper(content=content)
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, {"app": {}, "app2": {}})
+ mock_exists.assert_called_once()
+ mock_open.assert_called_once()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.path.exists")
+ def test_not_existing_file(self, mock_exists, mock_open):
+ kdu = None
+ mock_exists.return_value = False
+ with self.assertRaises(K8sException):
+ kdu = self.loop.run_until_complete(self.k8s_juju_conn.inspect_kdu("model"))
+ self.assertEqual(kdu, None)
+ mock_exists.assert_called_once_with("model")
+ mock_open.assert_not_called()
+
+
+class HelpKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(HelpKduTest, self).setUp()
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_existing_file(self, mock_listdir, mock_open):
+ content = "Readme file content"
+ mock_open.return_value = FakeFileWrapper(content=content)
+ for file in ["README.md", "README.txt", "README"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, content)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 3)
+
+ @asynctest.mock.patch("builtins.open")
+ @asynctest.mock.patch("os.listdir")
+ def test_not_existing_file(self, mock_listdir, mock_open):
+ for file in ["src/charm.py", "tox.ini", "requirements.txt"]:
+ mock_listdir.return_value = [file]
+ help = self.loop.run_until_complete(
+ self.k8s_juju_conn.help_kdu("kdu_instance")
+ )
+ self.assertEqual(help, None)
+
+ self.assertEqual(mock_listdir.call_count, 3)
+ self.assertEqual(mock_open.call_count, 0)
+
+
+class StatusKduTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(StatusKduTest, self).setUp()
+ self.k8s_juju_conn.libjuju.get_model_status = AsyncMock()
+
+ def test_success(self):
+ applications = {"app": {"status": {"status": "active"}}}
+ model = FakeModel(applications=applications)
+ self.k8s_juju_conn.libjuju.get_model_status.return_value = model
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertEqual(status, {"app": {"status": "active"}})
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+ def test_exception(self):
+ self.k8s_juju_conn.libjuju.get_model_status.side_effect = Exception()
+ status = None
+ with self.assertRaises(Exception):
+ status = self.loop.run_until_complete(
+ self.k8s_juju_conn.status_kdu("cluster", "kdu_instance")
+ )
+ self.assertIsNone(status)
+ self.k8s_juju_conn.libjuju.get_model_status.assert_called_once()
+
+
+class GetServicesTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServicesTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_services("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetServiceTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetServiceTest, self).setUp()
+
+ @asynctest.mock.patch("n2vc.kubectl.Kubectl.get_services")
+ @asynctest.mock.patch("n2vc.k8s_juju_conn.K8sJujuConnector.get_credentials")
+ def test_success(self, mock_get_credentials, mock_get_services):
+ mock_get_credentials.return_value = kubeconfig
+ self.loop.run_until_complete(self.k8s_juju_conn.get_service("", "", ""))
+ mock_get_credentials.assert_called_once()
+ mock_get_services.assert_called_once()
+
+
+class GetCredentialsTest(K8sJujuConnTestCase):
+ def setUp(self):
+ super(GetCredentialsTest, self).setUp()
+
+ @asynctest.mock.patch("yaml.safe_dump")
+ def test_success(self, mock_safe_dump):
+ self.k8s_juju_conn.db.get_one.return_value = {
+ "_id": "id",
+ "credentials": "credentials",
+ "schema_version": "2",
+ }
+ self.k8s_juju_conn.get_credentials("cluster_uuid")
+ self.k8s_juju_conn.db.get_one.assert_called_once()
+ self.k8s_juju_conn.db.encrypt_decrypt_fields.assert_called_once()
+ mock_safe_dump.assert_called_once()