import asyncio
import concurrent
-from .exceptions import NotImplemented
+import os
+import uuid
+import yaml
-import io
import juju
-# from juju.bundle import BundleHandler
from juju.controller import Controller
from juju.model import Model
-from juju.errors import JujuAPIError, JujuError
-
-import logging
-
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
+from .exceptions import MethodNotImplemented
-import os
+
+# from juju.bundle import BundleHandler
# import re
# import ssl
# from .vnf import N2VC
-
-import uuid
-import yaml
-
-
class K8sJujuConnector(K8sConnector):
-
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- juju_command: str = '/usr/bin/juju',
- log=None,
- on_update_db=None,
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ juju_command: str = "/usr/bin/juju",
+ log: object = None,
+ on_update_db=None,
):
"""
# parent class
K8sConnector.__init__(
- self,
- db,
- log=log,
- on_update_db=on_update_db,
+ self, db, log=log, on_update_db=on_update_db,
)
self.fs = fs
- self.info('Initializing K8S Juju connector')
+ self.log.debug("Initializing K8S Juju connector")
self.authenticated = False
self.models = {}
- self.log = logging.getLogger(__name__)
self.juju_command = juju_command
self.juju_secret = ""
- self.info('K8S Juju connector initialized')
+ self.log.debug("K8S Juju connector initialized")
"""Initialization"""
+
async def init_env(
self,
k8s_creds: str,
- namespace: str = 'kube-system',
+ namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
- :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for juju. By default,
+ 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
- (on error, an exception will be raised)
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
+ (on error, an exception will be raised)
"""
"""Bootstrapping
# Name the new k8s cloud
k8s_cloud = "k8s-{}".format(cluster_uuid)
- print("Adding k8s cloud {}".format(k8s_cloud))
+ self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
await self.add_k8s(k8s_cloud, k8s_creds)
# Bootstrap Juju controller
- print("Bootstrapping...")
+ self.log.debug("Bootstrapping...")
await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
- print("Bootstrap done.")
+ self.log.debug("Bootstrap done.")
# Get the controller information
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
- print("Getting controller endpoints")
- with open(os.path.expanduser(
- "~/.local/share/juju/controllers.yaml"
- )) as f:
+ self.log.debug("Getting controller endpoints")
+ with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
- endpoints = controller['api-endpoints']
+ controller = controllers["controllers"][cluster_uuid]
+ endpoints = controller["api-endpoints"]
self.juju_endpoint = endpoints[0]
- self.juju_ca_cert = controller['ca-cert']
+ self.juju_ca_cert = controller["ca-cert"]
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
- print("Getting accounts")
- with open(os.path.expanduser(
- "~/.local/share/juju/accounts.yaml"
- )) as f:
+ self.log.debug("Getting accounts")
+ with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
+ controller = controllers["controllers"][cluster_uuid]
- self.juju_user = controller['user']
- self.juju_secret = controller['password']
-
- print("user: {}".format(self.juju_user))
- print("secret: {}".format(self.juju_secret))
- print("endpoint: {}".format(self.juju_endpoint))
- print("ca-cert: {}".format(self.juju_ca_cert))
+ self.juju_user = controller["user"]
+ self.juju_secret = controller["password"]
# raise Exception("EOL")
self.juju_public_key = None
config = {
- 'endpoint': self.juju_endpoint,
- 'username': self.juju_user,
- 'secret': self.juju_secret,
- 'cacert': self.juju_ca_cert,
- 'namespace': namespace,
- 'loadbalancer': loadbalancer,
+ "endpoint": self.juju_endpoint,
+ "username": self.juju_user,
+ "secret": self.juju_secret,
+ "cacert": self.juju_ca_cert,
+ "namespace": namespace,
+ "loadbalancer": loadbalancer,
}
# Store the cluster configuration so it
# can be used for subsequent calls
- print("Setting config")
+ self.log.debug("Setting config")
await self.set_config(cluster_uuid, config)
else:
config = self.get_config(cluster_uuid)
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
+ self.juju_endpoint = config["endpoint"]
+ self.juju_user = config["username"]
+ self.juju_secret = config["secret"]
+ self.juju_ca_cert = config["cacert"]
self.juju_public_key = None
# Login to the k8s cluster
await self.login(cluster_uuid)
# We're creating a new cluster
- print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
- model = await self.get_model(
- self.get_namespace(cluster_uuid),
- cluster_uuid=cluster_uuid
- )
+ # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid))
+ # model = await self.get_model(
+ # self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid
+ # )
# Disconnect from the model
- if model and model.is_connected():
- await model.disconnect()
+ # if model and model.is_connected():
+ # await model.disconnect()
return cluster_uuid, True
"""Repo Management"""
+
async def repo_add(
- self,
- name: str,
- url: str,
- type: str = "charm",
+ self, name: str, url: str, _type: str = "charm",
):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_list(self):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_remove(
- self,
- name: str,
+ self, name: str,
):
- raise NotImplemented()
+ raise MethodNotImplemented()
+
+ async def synchronize_repos(self, cluster_uuid: str, name: str):
+ """
+ Returns None as currently add_repo is not implemented
+ """
+ return None
"""Reset"""
+
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
"""Reset a cluster
# Destroy the model
namespace = self.get_namespace(cluster_uuid)
if await self.has_model(namespace):
- print("[reset] Destroying model")
- await self.controller.destroy_model(
- namespace,
- destroy_storage=True
- )
+ self.log.debug("[reset] Destroying model")
+ await self.controller.destroy_model(namespace, destroy_storage=True)
# Disconnect from the controller
- print("[reset] Disconnecting controller")
- await self.controller.disconnect()
+ self.log.debug("[reset] Disconnecting controller")
+ await self.logout()
# Destroy the controller (via CLI)
- print("[reset] Destroying controller")
+ self.log.debug("[reset] Destroying controller")
await self.destroy_controller(cluster_uuid)
- print("[reset] Removing k8s cloud")
- namespace = self.get_namespace(cluster_uuid)
- k8s_cloud = "{}-k8s".format(namespace)
+ self.log.debug("[reset] Removing k8s cloud")
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
await self.remove_cloud(k8s_cloud)
except Exception as ex:
- print("Caught exception during reset: {}".format(ex))
+ self.log.debug("Caught exception during reset: {}".format(ex))
+
+ return True
"""Deployment"""
atomic: bool = True,
timeout: float = 300,
params: dict = None,
- db_dict: dict = None
+ db_dict: dict = None,
+ kdu_name: str = None,
+ namespace: str = None,
) -> bool:
"""Install a bundle
:param timeout int: The time, in seconds, to wait for the install
to finish
:param params dict: Key-value pairs of instantiation parameters
+ :param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
:return: If successful, returns ?
"""
if not self.authenticated:
- print("[install] Logging in to the controller")
+ self.log.debug("[install] Logging in to the controller")
await self.login(cluster_uuid)
##
# Get or create the model, based on the NS
# uuid.
- model_name = db_dict["filter"]["_id"]
+ if kdu_name:
+ kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+ else:
+ kdu_instance = db_dict["filter"]["_id"]
+
+ self.log.debug("Checking for model named {}".format(kdu_instance))
- self.log.debug("Checking for model named {}".format(model_name))
- model = await self.get_model(model_name, cluster_uuid=cluster_uuid)
- if not model:
- # Create the new model
- self.log.debug("Adding model: {}".format(model_name))
- model = await self.add_model(model_name, cluster_uuid=cluster_uuid)
+ # Create the new model
+ self.log.debug("Adding model: {}".format(kdu_instance))
+ model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
if model:
# TODO: Instantiation parameters
"Juju bundle that models the KDU, in any of the following ways:
- <juju-repo>/<juju-bundle>
- <juju-bundle folder under k8s_models folder in the package>
- - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+ in the package>
- <URL_where_to_fetch_juju_bundle>
"""
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
bundle = kdu_model
if kdu_model.startswith("cs:"):
# Download the file
pass
else:
- # Local file
+ new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
- # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
- # Uncompress temporarily
- # bundle = <uncompressed file>
- pass
+ os.chdir(new_workdir)
+
+ bundle = "local:{}".format(kdu_model)
if not bundle:
# Raise named exception that the bundle could not be found
raise Exception()
- print("[install] deploying {}".format(bundle))
+ self.log.debug("[install] deploying {}".format(bundle))
await model.deploy(bundle)
# Get the application
if atomic:
# applications = model.applications
- print("[install] Applications: {}".format(model.applications))
+ self.log.debug("[install] Applications: {}".format(model.applications))
for name in model.applications:
- print("[install] Waiting for {} to settle".format(name))
+ self.log.debug("[install] Waiting for {} to settle".format(name))
application = model.applications[name]
try:
# It's not enough to wait for all units to be active;
# the application status needs to be active as well.
- print("Waiting for all units to be active...")
+ self.log.debug("Waiting for all units to be active...")
await model.block_until(
lambda: all(
- unit.agent_status == 'idle'
- and application.status in ['active', 'unknown']
- and unit.workload_status in [
- 'active', 'unknown'
- ] for unit in application.units
+ unit.agent_status == "idle"
+ and application.status in ["active", "unknown"]
+ and unit.workload_status in ["active", "unknown"]
+ for unit in application.units
),
- timeout=timeout
+ timeout=timeout,
)
- print("All units active.")
+ self.log.debug("All units active.")
+ # TODO use asyncio.TimeoutError
except concurrent.futures._base.TimeoutError:
- print("[install] Timeout exceeded; resetting cluster")
+ os.chdir(previous_workdir)
+ self.log.debug("[install] Timeout exceeded; resetting cluster")
await self.reset(cluster_uuid)
return False
# Wait for the application to be active
if model.is_connected():
- print("[install] Disconnecting model")
+ self.log.debug("[install] Disconnecting model")
await model.disconnect()
- return True
+ os.chdir(previous_workdir)
+
+ return kdu_instance
raise Exception("Unable to install")
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
namespace = self.get_namespace(cluster_uuid)
model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
- with open(kdu_model, 'r') as f:
+ with open(kdu_model, "r") as f:
bundle = yaml.safe_load(f)
"""
}
"""
# TODO: This should be returned in an agreed-upon format
- for name in bundle['applications']:
- print(model.applications)
+ for name in bundle["applications"]:
+ self.log.debug(model.applications)
application = model.applications[name]
- print(application)
+ self.log.debug(application)
- path = bundle['applications'][name]['charm']
+ path = bundle["applications"][name]["charm"]
try:
await application.upgrade_charm(switch=path)
except juju.errors.JujuError as ex:
- if 'already running charm' in str(ex):
+ if "already running charm" in str(ex):
# We're already running this version
pass
await model.disconnect()
return True
- raise NotImplemented()
+ raise MethodNotImplemented()
"""Rollback"""
+
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision: int = 0,
+ self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
) -> str:
"""Rollback a model
:return: If successful, returns the revision of active KDU instance,
or raises an exception
"""
- raise NotImplemented()
+ raise MethodNotImplemented()
"""Deletion"""
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- ) -> bool:
+
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
"""Uninstall a KDU instance
- :param cluster_uuid str: The UUID of the cluster to uninstall
+ :param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
:return: Returns True if successful, or raises an exception
"""
- removed = False
+ if not self.authenticated:
+ self.log.debug("[uninstall] Connecting to controller")
+ await self.login(cluster_uuid)
- # Remove an application from the model
- model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
+ self.log.debug("[uninstall] Destroying model")
- if model:
- # Get the application
- if kdu_instance not in model.applications:
- # TODO: Raise a named exception
- raise Exception("Application not found.")
+ await self.controller.destroy_models(kdu_instance)
+
+ self.log.debug("[uninstall] Model destroyed and disconnecting")
+ await self.logout()
+
+ return True
+
+ async def exec_primitive(
+ self,
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+ if not self.authenticated:
+ self.log.debug("[exec_primitive] Connecting to controller")
+ await self.login(cluster_uuid)
+
+ if not params or "application-name" not in params:
+ raise K8sException(
+ "Missing application-name argument, \
+ argument needed for K8s actions"
+ )
+ try:
+ self.log.debug(
+ "[exec_primitive] Getting model "
+ "kdu_instance: {}".format(kdu_instance)
+ )
+
+ model = await self.get_model(kdu_instance, cluster_uuid)
+
+ application_name = params["application-name"]
+ application = model.applications[application_name]
+
+ actions = await application.get_actions()
+ if primitive_name not in actions:
+ raise K8sException("Primitive {} not found".format(primitive_name))
- application = model.applications[kdu_instance]
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ break
+
+ if unit is None:
+ raise K8sException("No leader unit found to execute action")
+
+ self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+ action = await unit.run_action(primitive_name, **params)
+
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+ status = (
+ status[action.entity_id] if action.entity_id in status else "failed"
+ )
- # Destroy the application
- await application.destroy()
+ if status != "completed":
+ raise K8sException(
+ "status is not completed: {} output: {}".format(status, output)
+ )
- # TODO: Verify removal
+ return output
- removed = True
- return removed
+ except Exception as e:
+ error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
"""Introspection"""
- async def inspect_kdu(
- self,
- kdu_model: str,
- ) -> dict:
+
+ async def inspect_kdu(self, kdu_model: str,) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
"""
kdu = {}
- with open(kdu_model, 'r') as f:
+ with open(kdu_model, "r") as f:
bundle = yaml.safe_load(f)
"""
}
"""
# TODO: This should be returned in an agreed-upon format
- kdu = bundle['applications']
+ kdu = bundle["applications"]
return kdu
- async def help_kdu(
- self,
- kdu_model: str,
- ) -> str:
+ async def help_kdu(self, kdu_model: str,) -> str:
"""View the README
If available, returns the README of the bundle.
"""
readme = None
- files = ['README', 'README.txt', 'README.md']
+ files = ["README", "README.txt", "README.md"]
path = os.path.dirname(kdu_model)
for file in os.listdir(path):
if file in files:
- with open(file, 'r') as f:
+ with open(file, "r") as f:
readme = f.read()
break
return readme
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- ) -> dict:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
"""Get the status of the KDU
Get the current status of the KDU instance.
"""
status = {}
- model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
+ model = await self.get_model(
+ self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
+ )
# model = await self.get_model_by_uuid(cluster_uuid)
if model:
for name in model_status.applications:
application = model_status.applications[name]
- status[name] = {
- 'status': application['status']['status']
- }
+ status[name] = {"status": application["status"]["status"]}
if model.is_connected():
await model.disconnect()
return status
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
+
# Private methods
- async def add_k8s(
- self,
- cloud_name: str,
- credentials: str,
- ) -> bool:
+ async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
"""Add a k8s cloud to Juju
Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
"""
cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
- print(cmd)
+ self.log.debug(cmd)
process = await asyncio.create_subprocess_exec(
*cmd,
await process.stdin.drain()
process.stdin.close()
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
- print("add-k8s return code: {}".format(return_code))
+ self.log.debug("add-k8s return code: {}".format(return_code))
if return_code > 0:
raise Exception(stderr)
return True
- async def add_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
"""Adds a model to the controller
Adds a new model to the Juju controller
if not self.authenticated:
await self.login(cluster_uuid)
- self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
- model = await self.controller.add_model(
- model_name,
- config={'authorized-keys': self.juju_public_key}
+ self.log.debug(
+ "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
)
+ try:
+ if self.juju_public_key is not None:
+ model = await self.controller.add_model(
+ model_name, config={"authorized-keys": self.juju_public_key}
+ )
+ else:
+ model = await self.controller.add_model(model_name)
+ except Exception as ex:
+ self.log.debug(ex)
+ self.log.debug("Caught exception: {}".format(ex))
+ pass
+
return model
async def bootstrap(
- self,
- cloud_name: str,
- cluster_uuid: str,
- loadbalancer: bool
+ self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
) -> bool:
"""Bootstrap a Kubernetes controller
cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
else:
"""
- For public clusters, specify that the controller service is using a LoadBalancer.
+ For public clusters, specify that the controller service is using a
+ LoadBalancer.
"""
- cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
-
- print("Bootstrapping controller {} in cloud {}".format(
- cluster_uuid, cloud_name
- ))
+ cmd = [
+ self.juju_command,
+ "bootstrap",
+ cloud_name,
+ cluster_uuid,
+ "--config",
+ "controller-service-type=loadbalancer",
+ ]
+
+ self.log.debug(
+ "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+ )
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if b'already exists' not in stderr:
+ if b"already exists" not in stderr:
raise Exception(stderr)
return True
- async def destroy_controller(
- self,
- cluster_uuid: str
- ) -> bool:
+ async def destroy_controller(self, cluster_uuid: str) -> bool:
"""Destroy a Kubernetes controller
Destroy an existing Kubernetes controller.
"--destroy-all-models",
"--destroy-storage",
"-y",
- cluster_uuid
+ cluster_uuid,
]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if 'already exists' not in stderr:
+ if "already exists" not in stderr:
raise Exception(stderr)
- def get_config(
- self,
- cluster_uuid: str,
- ) -> dict:
+ def get_config_file(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig location
+ """
+ return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
+
+ def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration
Gets the configuration of the cluster
"""
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if os.path.exists(cluster_config):
- with open(cluster_config, 'r') as f:
+ with open(cluster_config, "r") as f:
config = yaml.safe_load(f.read())
return config
else:
raise Exception(
- "Unable to locate configuration for cluster {}".format(
- cluster_uuid
- )
+ "Unable to locate configuration for cluster {}".format(cluster_uuid)
)
- async def get_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
"""Get a model from the Juju Controller.
Note: Model objects returned must call disconnected() before it goes
model = None
models = await self.controller.list_models()
- self.log.debug(models)
if model_name in models:
self.log.debug("Found model: {}".format(model_name))
- model = await self.controller.get_model(
- model_name
- )
+ model = await self.controller.get_model(model_name)
return model
- def get_namespace(
- self,
- cluster_uuid: str,
- ) -> str:
+ def get_namespace(self, cluster_uuid: str,) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
config = self.get_config(cluster_uuid)
# Make sure the name is in the config
- if 'namespace' not in config:
+ if "namespace" not in config:
raise Exception("Namespace not found.")
# TODO: We want to make sure this is unique to the cluster, in case
# the cluster is being reused.
# Consider pre/appending the cluster id to the namespace string
- return config['namespace']
+ return config["namespace"]
- async def has_model(
- self,
- model_name: str
- ) -> bool:
+ async def has_model(self, model_name: str) -> bool:
"""Check if a model exists in the controller
Checks to see if a model exists in the connected Juju controller.
return True
return False
- def is_local_k8s(
- self,
- credentials: str,
- ) -> bool:
+ def is_local_k8s(self, credentials: str,) -> bool:
"""Check if a cluster is local
Checks if a cluster is running in the local host
host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
if creds and host_ip:
- for cluster in creds['clusters']:
- if 'server' in cluster['cluster']:
- if host_ip in cluster['cluster']['server']:
+ for cluster in creds["clusters"]:
+ if "server" in cluster["cluster"]:
+ if host_ip in cluster["cluster"]["server"]:
return True
return False
# Test: Make sure we have the credentials loaded
config = self.get_config(cluster_uuid)
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
+ self.juju_endpoint = config["endpoint"]
+ self.juju_user = config["username"]
+ self.juju_secret = config["secret"]
+ self.juju_ca_cert = config["cacert"]
self.juju_public_key = None
self.controller = Controller()
if self.juju_secret:
self.log.debug(
"Connecting to controller... ws://{} as {}/{}".format(
- self.juju_endpoint,
- self.juju_user,
- self.juju_secret,
+ self.juju_endpoint, self.juju_user, self.juju_secret,
)
)
try:
self.authenticated = True
self.log.debug("JujuApi: Logged into controller")
except Exception as ex:
- print(ex)
+ self.log.debug(ex)
self.log.debug("Caught exception: {}".format(ex))
pass
else:
async def logout(self):
"""Logout of the Juju controller."""
- print("[logout]")
+ self.log.debug("[logout]")
if not self.authenticated:
return False
for model in self.models:
- print("Logging out of model {}".format(model))
+ self.log.debug("Logging out of model {}".format(model))
await self.models[model].disconnect()
if self.controller:
- self.log.debug("Disconnecting controller {}".format(
- self.controller
- ))
+ self.log.debug("Disconnecting controller {}".format(self.controller))
await self.controller.disconnect()
self.controller = None
self.authenticated = False
- async def remove_cloud(
- self,
- cloud_name: str,
- ) -> bool:
+ async def remove_cloud(self, cloud_name: str,) -> bool:
"""Remove a k8s cloud from Juju
Removes a Kubernetes cloud from Juju.
# Remove the bootstrapped controller
cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
# Remove the cloud from the local config
cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
return True
- async def set_config(
- self,
- cluster_uuid: str,
- config: dict,
- ) -> bool:
+ async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
"""Save the cluster configuration
Saves the cluster information to the file store
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if not os.path.exists(cluster_config):
- print("Writing config to {}".format(cluster_config))
- with open(cluster_config, 'w') as f:
+ self.log.debug("Writing config to {}".format(cluster_config))
+ with open(cluster_config, "w") as f:
f.write(yaml.dump(config, Dumper=yaml.Dumper))
return True