import asyncio
import concurrent
-from .exceptions import NotImplemented
+import os
+import uuid
+import yaml
-import io
-import juju
-# from juju.bundle import BundleHandler
from juju.controller import Controller
from juju.model import Model
-from juju.errors import JujuAPIError, JujuError
-
-import logging
-
+from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
+from .exceptions import MethodNotImplemented, N2VCNotFound
-import os
+
+# from juju.bundle import BundleHandler
# import re
# import ssl
# from .vnf import N2VC
-
-import uuid
-import yaml
-
-
class K8sJujuConnector(K8sConnector):
-
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- juju_command: str = '/usr/bin/juju',
- log=None,
- on_update_db=None,
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ juju_command: str = "/usr/bin/juju",
+ log: object = None,
+ on_update_db=None,
):
"""
# parent class
K8sConnector.__init__(
- self,
- db,
- log=log,
- on_update_db=on_update_db,
+ self, db, log=log, on_update_db=on_update_db,
)
self.fs = fs
- self.info('Initializing K8S Juju connector')
-
- self.authenticated = False
- self.models = {}
- self.log = logging.getLogger(__name__)
+ self.log.debug("Initializing K8S Juju connector")
self.juju_command = juju_command
- self.juju_secret = ""
+ self.juju_public_key = None
- self.info('K8S Juju connector initialized')
+ self.log.debug("K8S Juju connector initialized")
+ # TODO: Remove these commented lines:
+ # self.authenticated = False
+ # self.models = {}
+ # self.juju_secret = ""
"""Initialization"""
+
async def init_env(
self,
k8s_creds: str,
- namespace: str = 'kube-system',
+ namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
- :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for juju. By default,
+ 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
- (on error, an exception will be raised)
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
+ (on error, an exception will be raised)
"""
"""Bootstrapping
# reuse_cluster_uuid, e.g. to try to fix it. #
###################################################
- if not reuse_cluster_uuid:
- # This is a new cluster, so bootstrap it
-
- cluster_uuid = str(uuid.uuid4())
-
- # Is a local k8s cluster?
- localk8s = self.is_local_k8s(k8s_creds)
-
- # If the k8s is external, the juju controller needs a loadbalancer
- loadbalancer = False if localk8s else True
-
- # Name the new k8s cloud
- k8s_cloud = "k8s-{}".format(cluster_uuid)
-
- print("Adding k8s cloud {}".format(k8s_cloud))
- await self.add_k8s(k8s_cloud, k8s_creds)
-
- # Bootstrap Juju controller
- print("Bootstrapping...")
- await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
- print("Bootstrap done.")
-
- # Get the controller information
-
- # Parse ~/.local/share/juju/controllers.yaml
- # controllers.testing.api-endpoints|ca-cert|uuid
- print("Getting controller endpoints")
- with open(os.path.expanduser(
- "~/.local/share/juju/controllers.yaml"
- )) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
- endpoints = controller['api-endpoints']
- self.juju_endpoint = endpoints[0]
- self.juju_ca_cert = controller['ca-cert']
-
- # Parse ~/.local/share/juju/accounts
- # controllers.testing.user|password
- print("Getting accounts")
- with open(os.path.expanduser(
- "~/.local/share/juju/accounts.yaml"
- )) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
-
- self.juju_user = controller['user']
- self.juju_secret = controller['password']
-
- print("user: {}".format(self.juju_user))
- print("secret: {}".format(self.juju_secret))
- print("endpoint: {}".format(self.juju_endpoint))
- print("ca-cert: {}".format(self.juju_ca_cert))
-
- # raise Exception("EOL")
-
- self.juju_public_key = None
-
- config = {
- 'endpoint': self.juju_endpoint,
- 'username': self.juju_user,
- 'secret': self.juju_secret,
- 'cacert': self.juju_ca_cert,
- 'namespace': namespace,
- 'loadbalancer': loadbalancer,
- }
-
- # Store the cluster configuration so it
- # can be used for subsequent calls
- print("Setting config")
- await self.set_config(cluster_uuid, config)
-
- else:
- # This is an existing cluster, so get its config
- cluster_uuid = reuse_cluster_uuid
-
- config = self.get_config(cluster_uuid)
-
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
- self.juju_public_key = None
-
+ # This is a new cluster, so bootstrap it
+
+ cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
+
+ # Is a local k8s cluster?
+ localk8s = self.is_local_k8s(k8s_creds)
+
+ # If the k8s is external, the juju controller needs a loadbalancer
+ loadbalancer = False if localk8s else True
+
+ # Name the new k8s cloud
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
+
+ self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
+ await self.add_k8s(k8s_cloud, k8s_creds)
+
+ # Bootstrap Juju controller
+ self.log.debug("Bootstrapping...")
+ await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
+ self.log.debug("Bootstrap done.")
+
+ # Get the controller information
+
+ # Parse ~/.local/share/juju/controllers.yaml
+ # controllers.testing.api-endpoints|ca-cert|uuid
+ self.log.debug("Getting controller endpoints")
+ with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers["controllers"][cluster_uuid]
+ endpoints = controller["api-endpoints"]
+ juju_endpoint = endpoints[0]
+ juju_ca_cert = controller["ca-cert"]
+
+ # Parse ~/.local/share/juju/accounts
+ # controllers.testing.user|password
+ self.log.debug("Getting accounts")
+ with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers["controllers"][cluster_uuid]
+
+ juju_user = controller["user"]
+ juju_secret = controller["password"]
+
+ config = {
+ "endpoint": juju_endpoint,
+ "username": juju_user,
+ "secret": juju_secret,
+ "cacert": juju_ca_cert,
+ "loadbalancer": loadbalancer,
+ }
+
+ # Store the cluster configuration so it
+ # can be used for subsequent calls
+ self.log.debug("Setting config")
+ await self.set_config(cluster_uuid, config)
+
+ # Test connection
+ controller = await self.get_controller(cluster_uuid)
+ await controller.disconnect()
+
+ # TODO: Remove these commented lines
+ # raise Exception("EOL")
+ # self.juju_public_key = None
# Login to the k8s cluster
- if not self.authenticated:
- await self.login(cluster_uuid)
+ # if not self.authenticated:
+ # await self.login(cluster_uuid)
# We're creating a new cluster
- print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
- model = await self.get_model(
- self.get_namespace(cluster_uuid),
- cluster_uuid=cluster_uuid
- )
+ # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid))
+ # model = await self.get_model(
+ # self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid
+ # )
# Disconnect from the model
- if model and model.is_connected():
- await model.disconnect()
+ # if model and model.is_connected():
+ # await model.disconnect()
return cluster_uuid, True
"""Repo Management"""
+
async def repo_add(
- self,
- name: str,
- url: str,
- type: str = "charm",
+ self, name: str, url: str, _type: str = "charm",
):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_list(self):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_remove(
- self,
- name: str,
+ self, name: str,
):
- raise NotImplemented()
+ raise MethodNotImplemented()
+
+ async def synchronize_repos(self, cluster_uuid: str, name: str):
+ """
+ Returns None as currently add_repo is not implemented
+ """
+ return None
"""Reset"""
+
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
"""Reset a cluster
"""
try:
- if not self.authenticated:
- await self.login(cluster_uuid)
-
- if self.controller.is_connected():
- # Destroy the model
- namespace = self.get_namespace(cluster_uuid)
- if await self.has_model(namespace):
- print("[reset] Destroying model")
- await self.controller.destroy_model(
- namespace,
- destroy_storage=True
- )
- # Disconnect from the controller
- print("[reset] Disconnecting controller")
- await self.controller.disconnect()
+ # Remove k8scluster from database
+ self.log.debug("[reset] Removing k8scluster from juju database")
+ juju_db = self.db.get_one("admin", {"_id": "juju"})
+
+ for k in juju_db["k8sclusters"]:
+ if k["_id"] == cluster_uuid:
+ juju_db["k8sclusters"].remove(k)
+ self.db.set_one(
+ table="admin",
+ q_filter={"_id": "juju"},
+ update_dict={"k8sclusters": juju_db["k8sclusters"]},
+ )
+ break
- # Destroy the controller (via CLI)
- print("[reset] Destroying controller")
- await self.destroy_controller(cluster_uuid)
+ # Destroy the controller (via CLI)
+ self.log.debug("[reset] Destroying controller")
+ await self.destroy_controller(cluster_uuid)
- print("[reset] Removing k8s cloud")
- namespace = self.get_namespace(cluster_uuid)
- k8s_cloud = "{}-k8s".format(namespace)
- await self.remove_cloud(k8s_cloud)
+ self.log.debug("[reset] Removing k8s cloud")
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
+ await self.remove_cloud(k8s_cloud)
except Exception as ex:
- print("Caught exception during reset: {}".format(ex))
+ self.log.debug("Caught exception during reset: {}".format(ex))
+ return True
+ # TODO: Remove these commented lines
+ # if not self.authenticated:
+ # await self.login(cluster_uuid)
+
+ # if self.controller.is_connected():
+ # # Destroy the model
+ # namespace = self.get_namespace(cluster_uuid)
+ # if await self.has_model(namespace):
+ # self.log.debug("[reset] Destroying model")
+ # await self.controller.destroy_model(namespace, destroy_storage=True)
+
+ # # Disconnect from the controller
+ # self.log.debug("[reset] Disconnecting controller")
+ # await self.logout()
"""Deployment"""
timeout: float = 300,
params: dict = None,
db_dict: dict = None,
- kdu_name: str = None
+ kdu_name: str = None,
+ namespace: str = None,
) -> bool:
"""Install a bundle
to finish
:param params dict: Key-value pairs of instantiation parameters
:param kdu_name: Name of the KDU instance to be installed
+ :param namespace: K8s namespace to use for the KDU instance
:return: If successful, returns ?
"""
- if not self.authenticated:
- print("[install] Logging in to the controller")
- await self.login(cluster_uuid)
+ controller = await self.get_controller(cluster_uuid)
##
# Get or create the model, based on the NS
kdu_instance = db_dict["filter"]["_id"]
self.log.debug("Checking for model named {}".format(kdu_instance))
- model = await self.get_model(kdu_instance, cluster_uuid=cluster_uuid)
- if not model:
- # Create the new model
- self.log.debug("Adding model: {}".format(kdu_instance))
- model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
+
+ # Create the new model
+ self.log.debug("Adding model: {}".format(kdu_instance))
+ model = await self.add_model(
+ kdu_instance, cluster_uuid=cluster_uuid, controller=controller
+ )
if model:
# TODO: Instantiation parameters
"Juju bundle that models the KDU, in any of the following ways:
- <juju-repo>/<juju-bundle>
- <juju-bundle folder under k8s_models folder in the package>
- - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+ in the package>
- <URL_where_to_fetch_juju_bundle>
"""
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
bundle = kdu_model
if kdu_model.startswith("cs:"):
# Download the file
pass
else:
- # Local file
+ new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
- # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
- # Uncompress temporarily
- # bundle = <uncompressed file>
- pass
+ os.chdir(new_workdir)
+
+ bundle = "local:{}".format(kdu_model)
if not bundle:
# Raise named exception that the bundle could not be found
raise Exception()
- print("[install] deploying {}".format(bundle))
+ self.log.debug("[install] deploying {}".format(bundle))
await model.deploy(bundle)
# Get the application
if atomic:
# applications = model.applications
- print("[install] Applications: {}".format(model.applications))
+ self.log.debug("[install] Applications: {}".format(model.applications))
for name in model.applications:
- print("[install] Waiting for {} to settle".format(name))
+ self.log.debug("[install] Waiting for {} to settle".format(name))
application = model.applications[name]
try:
# It's not enough to wait for all units to be active;
# the application status needs to be active as well.
- print("Waiting for all units to be active...")
+ self.log.debug("Waiting for all units to be active...")
await model.block_until(
lambda: all(
- unit.agent_status == 'idle'
- and application.status in ['active', 'unknown']
- and unit.workload_status in [
- 'active', 'unknown'
- ] for unit in application.units
+ unit.agent_status == "idle"
+ and application.status in ["active", "unknown"]
+ and unit.workload_status in ["active", "unknown"]
+ for unit in application.units
),
- timeout=timeout
+ timeout=timeout,
)
- print("All units active.")
+ self.log.debug("All units active.")
+ # TODO use asyncio.TimeoutError
except concurrent.futures._base.TimeoutError:
- print("[install] Timeout exceeded; resetting cluster")
+ os.chdir(previous_workdir)
+ self.log.debug("[install] Timeout exceeded; resetting cluster")
await self.reset(cluster_uuid)
return False
# Wait for the application to be active
if model.is_connected():
- print("[install] Disconnecting model")
+ self.log.debug("[install] Disconnecting model")
await model.disconnect()
+ await controller.disconnect()
+ os.chdir(previous_workdir)
return kdu_instance
raise Exception("Unable to install")
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
storage would require a redeployment of the service, at least in this
initial release.
"""
- namespace = self.get_namespace(cluster_uuid)
- model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
-
- with open(kdu_model, 'r') as f:
- bundle = yaml.safe_load(f)
-
- """
- {
- 'description': 'Test bundle',
- 'bundle': 'kubernetes',
- 'applications': {
- 'mariadb-k8s': {
- 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
- 'scale': 1,
- 'options': {
- 'password': 'manopw',
- 'root_password': 'osm4u',
- 'user': 'mano'
- },
- 'series': 'kubernetes'
- }
- }
- }
- """
- # TODO: This should be returned in an agreed-upon format
- for name in bundle['applications']:
- print(model.applications)
- application = model.applications[name]
- print(application)
-
- path = bundle['applications'][name]['charm']
-
- try:
- await application.upgrade_charm(switch=path)
- except juju.errors.JujuError as ex:
- if 'already running charm' in str(ex):
- # We're already running this version
- pass
-
- await model.disconnect()
-
- return True
- raise NotImplemented()
+ raise MethodNotImplemented()
+ # TODO: Remove these commented lines
+
+ # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
+
+ # model = None
+ # namespace = self.get_namespace(cluster_uuid)
+ # controller = await self.get_controller(cluster_uuid)
+
+ # try:
+ # if namespace not in await controller.list_models():
+ # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
+
+ # model = await controller.get_model(namespace)
+ # with open(kdu_model, "r") as f:
+ # bundle = yaml.safe_load(f)
+
+ # """
+ # {
+ # 'description': 'Test bundle',
+ # 'bundle': 'kubernetes',
+ # 'applications': {
+ # 'mariadb-k8s': {
+ # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
+ # 'scale': 1,
+ # 'options': {
+ # 'password': 'manopw',
+ # 'root_password': 'osm4u',
+ # 'user': 'mano'
+ # },
+ # 'series': 'kubernetes'
+ # }
+ # }
+ # }
+ # """
+ # # TODO: This should be returned in an agreed-upon format
+ # for name in bundle["applications"]:
+ # self.log.debug(model.applications)
+ # application = model.applications[name]
+ # self.log.debug(application)
+
+ # path = bundle["applications"][name]["charm"]
+
+ # try:
+ # await application.upgrade_charm(switch=path)
+ # except juju.errors.JujuError as ex:
+ # if "already running charm" in str(ex):
+ # # We're already running this version
+ # pass
+ # finally:
+ # if model:
+ # await model.disconnect()
+ # await controller.disconnect()
+ # return True
"""Rollback"""
+
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision: int = 0,
+ self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
) -> str:
"""Rollback a model
:return: If successful, returns the revision of active KDU instance,
or raises an exception
"""
- raise NotImplemented()
+ raise MethodNotImplemented()
"""Deletion"""
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> bool:
+
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
"""Uninstall a KDU instance
:param cluster_uuid str: The UUID of the cluster
:return: Returns True if successful, or raises an exception
"""
- await self.controller.destroy_models(kdu_instance)
+
+ controller = await self.get_controller(cluster_uuid)
+
+ self.log.debug("[uninstall] Destroying model")
+
+ await controller.destroy_models(kdu_instance)
+
+ self.log.debug("[uninstall] Model destroyed and disconnecting")
+ await controller.disconnect()
return True
+ # TODO: Remove these commented lines
+ # if not self.authenticated:
+ # self.log.debug("[uninstall] Connecting to controller")
+ # await self.login(cluster_uuid)
- """Introspection"""
- async def inspect_kdu(
+ async def exec_primitive(
self,
- kdu_model: str,
- ) -> dict:
+ cluster_uuid: str = None,
+ kdu_instance: str = None,
+ primitive_name: str = None,
+ timeout: float = 300,
+ params: dict = None,
+ db_dict: dict = None,
+ ) -> str:
+ """Exec primitive (Juju action)
+
+ :param cluster_uuid str: The UUID of the cluster
+ :param kdu_instance str: The unique name of the KDU instance
+ :param primitive_name: Name of action that will be executed
+ :param timeout: Timeout for action execution
+ :param params: Dictionary of all the parameters needed for the action
+ :db_dict: Dictionary for any additional data
+
+ :return: Returns the output of the action
+ """
+
+ controller = await self.get_controller(cluster_uuid)
+
+ if not params or "application-name" not in params:
+ raise K8sException(
+ "Missing application-name argument, \
+ argument needed for K8s actions"
+ )
+ try:
+ self.log.debug(
+ "[exec_primitive] Getting model "
+ "kdu_instance: {}".format(kdu_instance)
+ )
+
+ model = await self.get_model(kdu_instance, controller=controller)
+
+ application_name = params["application-name"]
+ application = model.applications[application_name]
+
+ actions = await application.get_actions()
+ if primitive_name not in actions:
+ raise K8sException("Primitive {} not found".format(primitive_name))
+
+ unit = None
+ for u in application.units:
+ if await u.is_leader_from_status():
+ unit = u
+ break
+
+ if unit is None:
+ raise K8sException("No leader unit found to execute action")
+
+ self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
+ action = await unit.run_action(primitive_name, **params)
+
+ output = await model.get_action_output(action_uuid=action.entity_id)
+ status = await model.get_action_status(uuid_or_prefix=action.entity_id)
+
+ status = (
+ status[action.entity_id] if action.entity_id in status else "failed"
+ )
+
+ if status != "completed":
+ raise K8sException(
+ "status is not completed: {} output: {}".format(status, output)
+ )
+
+ return output
+
+ except Exception as e:
+ error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
+ self.log.error(error_msg)
+ raise K8sException(message=error_msg)
+ finally:
+ await controller.disconnect()
+ # TODO: Remove these commented lines:
+ # if not self.authenticated:
+ # self.log.debug("[exec_primitive] Connecting to controller")
+ # await self.login(cluster_uuid)
+
+ """Introspection"""
+
+ async def inspect_kdu(self, kdu_model: str,) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
"""
kdu = {}
- with open(kdu_model, 'r') as f:
+ with open(kdu_model, "r") as f:
bundle = yaml.safe_load(f)
"""
}
"""
# TODO: This should be returned in an agreed-upon format
- kdu = bundle['applications']
+ kdu = bundle["applications"]
return kdu
- async def help_kdu(
- self,
- kdu_model: str,
- ) -> str:
+ async def help_kdu(self, kdu_model: str,) -> str:
"""View the README
If available, returns the README of the bundle.
"""
readme = None
- files = ['README', 'README.txt', 'README.md']
+ files = ["README", "README.txt", "README.md"]
path = os.path.dirname(kdu_model)
for file in os.listdir(path):
if file in files:
- with open(file, 'r') as f:
+ with open(file, "r") as f:
readme = f.read()
break
return readme
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- ) -> dict:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
"""Get the status of the KDU
Get the current status of the KDU instance.
and deployment_time.
"""
status = {}
+ controller = await self.get_controller(cluster_uuid)
+ model = await self.get_model(kdu_instance, controller=controller)
- model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
-
- # model = await self.get_model_by_uuid(cluster_uuid)
- if model:
- model_status = await model.get_status()
- status = model_status.applications
+ model_status = await model.get_status()
+ status = model_status.applications
- for name in model_status.applications:
- application = model_status.applications[name]
- status[name] = {
- 'status': application['status']['status']
- }
+ for name in model_status.applications:
+ application = model_status.applications[name]
+ status[name] = {"status": application["status"]["status"]}
- if model.is_connected():
- await model.disconnect()
+ await model.disconnect()
+ await controller.disconnect()
return status
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
+
+ credentials = self.get_credentials(cluster_uuid=cluster_uuid)
+
+ config_path = "/tmp/{}".format(cluster_uuid)
+ config_file = "{}/config".format(config_path)
+
+ if not os.path.exists(config_path):
+ os.makedirs(config_path)
+ with open(config_file, "w") as f:
+ f.write(credentials)
+
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ credentials = self.get_credentials(cluster_uuid=cluster_uuid)
+
+ config_path = "/tmp/{}".format(cluster_uuid)
+ config_file = "{}/config".format(config_path)
+
+ if not os.path.exists(config_path):
+ os.makedirs(config_path)
+ with open(config_file, "w") as f:
+ f.write(credentials)
+
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
+
# Private methods
- async def add_k8s(
- self,
- cloud_name: str,
- credentials: str,
- ) -> bool:
+ async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
"""Add a k8s cloud to Juju
Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
"""
cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
- print(cmd)
+ self.log.debug(cmd)
process = await asyncio.create_subprocess_exec(
*cmd,
await process.stdin.drain()
process.stdin.close()
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
- print("add-k8s return code: {}".format(return_code))
+ self.log.debug("add-k8s return code: {}".format(return_code))
if return_code > 0:
raise Exception(stderr)
return True
async def add_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ self, model_name: str, cluster_uuid: str, controller: Controller
+ ) -> Model:
"""Adds a model to the controller
Adds a new model to the Juju controller
:param model_name str: The name of the model to add.
+ :param cluster_uuid str: ID of the cluster.
+ :param controller: Controller object in which the model will be added
:returns: The juju.model.Model object of the new model upon success or
raises an exception.
"""
- if not self.authenticated:
- await self.login(cluster_uuid)
- self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
- model = await self.controller.add_model(
- model_name,
- config={'authorized-keys': self.juju_public_key}
+ self.log.debug(
+ "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
)
+ model = None
+ try:
+ if self.juju_public_key is not None:
+ model = await controller.add_model(
+ model_name, config={"authorized-keys": self.juju_public_key}
+ )
+ else:
+ model = await controller.add_model(model_name)
+ except Exception as ex:
+ self.log.debug(ex)
+ self.log.debug("Caught exception: {}".format(ex))
+ pass
+
return model
async def bootstrap(
- self,
- cloud_name: str,
- cluster_uuid: str,
- loadbalancer: bool
+ self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
) -> bool:
"""Bootstrap a Kubernetes controller
cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
else:
"""
- For public clusters, specify that the controller service is using a LoadBalancer.
+ For public clusters, specify that the controller service is using a
+ LoadBalancer.
"""
- cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
-
- print("Bootstrapping controller {} in cloud {}".format(
- cluster_uuid, cloud_name
- ))
+ cmd = [
+ self.juju_command,
+ "bootstrap",
+ cloud_name,
+ cluster_uuid,
+ "--config",
+ "controller-service-type=loadbalancer",
+ ]
+
+ self.log.debug(
+ "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+ )
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if b'already exists' not in stderr:
+ if b"already exists" not in stderr:
raise Exception(stderr)
return True
- async def destroy_controller(
- self,
- cluster_uuid: str
- ) -> bool:
+ async def destroy_controller(self, cluster_uuid: str) -> bool:
"""Destroy a Kubernetes controller
Destroy an existing Kubernetes controller.
"--destroy-all-models",
"--destroy-storage",
"-y",
- cluster_uuid
+ cluster_uuid,
]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if 'already exists' not in stderr:
+ if "already exists" not in stderr:
raise Exception(stderr)
- def get_config(
- self,
- cluster_uuid: str,
- ) -> dict:
+ def get_credentials(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig
+ """
+ k8scluster = self.db.get_one(
+ "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
+ )
+
+ self.db.encrypt_decrypt_fields(
+ k8scluster.get("credentials"),
+ "decrypt",
+ ["password", "secret"],
+ schema_version=k8scluster["schema_version"],
+ salt=k8scluster["_id"],
+ )
+
+ return yaml.safe_dump(k8scluster.get("credentials"))
+
+ def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration
Gets the configuration of the cluster
:param cluster_uuid str: The UUID of the cluster.
:return: A dict upon success, or raises an exception.
"""
- cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
- if os.path.exists(cluster_config):
- with open(cluster_config, 'r') as f:
- config = yaml.safe_load(f.read())
- return config
- else:
- raise Exception(
- "Unable to locate configuration for cluster {}".format(
- cluster_uuid
+
+ juju_db = self.db.get_one("admin", {"_id": "juju"})
+ config = None
+ for k in juju_db["k8sclusters"]:
+ if k["_id"] == cluster_uuid:
+ config = k["config"]
+ self.db.encrypt_decrypt_fields(
+ config,
+ "decrypt",
+ ["secret", "cacert"],
+ schema_version="1.1",
+ salt=k["_id"],
)
+ break
+ if not config:
+ raise Exception(
+ "Unable to locate configuration for cluster {}".format(cluster_uuid)
)
+ return config
- async def get_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ async def get_model(self, model_name: str, controller: Controller) -> Model:
"""Get a model from the Juju Controller.
Note: Model objects returned must call disconnected() before it goes
out of scope.
:param model_name str: The name of the model to get
+ :param controller Controller: Controller object
:return The juju.model.Model object if found, or None.
"""
- if not self.authenticated:
- await self.login(cluster_uuid)
- model = None
- models = await self.controller.list_models()
- self.log.debug(models)
- if model_name in models:
- self.log.debug("Found model: {}".format(model_name))
- model = await self.controller.get_model(
- model_name
- )
- return model
+ models = await controller.list_models()
+ if model_name not in models:
+ raise N2VCNotFound("Model {} not found".format(model_name))
+ self.log.debug("Found model: {}".format(model_name))
+ return await controller.get_model(model_name)
- def get_namespace(
- self,
- cluster_uuid: str,
- ) -> str:
+ def get_namespace(self, cluster_uuid: str,) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
config = self.get_config(cluster_uuid)
# Make sure the name is in the config
- if 'namespace' not in config:
+ if "namespace" not in config:
raise Exception("Namespace not found.")
# TODO: We want to make sure this is unique to the cluster, in case
# the cluster is being reused.
# Consider pre/appending the cluster id to the namespace string
- return config['namespace']
+ return config["namespace"]
- async def has_model(
- self,
- model_name: str
- ) -> bool:
- """Check if a model exists in the controller
+ # TODO: Remove these lines of code
+ # async def has_model(self, model_name: str) -> bool:
+ # """Check if a model exists in the controller
- Checks to see if a model exists in the connected Juju controller.
+ # Checks to see if a model exists in the connected Juju controller.
- :param model_name str: The name of the model
- :return: A boolean indicating if the model exists
- """
- models = await self.controller.list_models()
+ # :param model_name str: The name of the model
+ # :return: A boolean indicating if the model exists
+ # """
+ # models = await self.controller.list_models()
- if model_name in models:
- return True
- return False
+ # if model_name in models:
+ # return True
+ # return False
- def is_local_k8s(
- self,
- credentials: str,
- ) -> bool:
+ def is_local_k8s(self, credentials: str,) -> bool:
"""Check if a cluster is local
Checks if a cluster is running in the local host
:param credentials dict: A dictionary containing the k8s credentials
:returns: A boolean if the cluster is running locally
"""
+
creds = yaml.safe_load(credentials)
- if os.getenv("OSMLCM_VCA_APIPROXY"):
- host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
- if creds and host_ip:
- for cluster in creds['clusters']:
- if 'server' in cluster['cluster']:
- if host_ip in cluster['cluster']['server']:
+ if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
+ for cluster in creds["clusters"]:
+ if "server" in cluster["cluster"]:
+ if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
return True
return False
- async def login(self, cluster_uuid):
+ async def get_controller(self, cluster_uuid):
"""Login to the Juju controller."""
- if self.authenticated:
- return
-
- self.connecting = True
-
- # Test: Make sure we have the credentials loaded
config = self.get_config(cluster_uuid)
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
- self.juju_public_key = None
+ juju_endpoint = config["endpoint"]
+ juju_user = config["username"]
+ juju_secret = config["secret"]
+ juju_ca_cert = config["cacert"]
- self.controller = Controller()
+ controller = Controller()
- if self.juju_secret:
+ if juju_secret:
self.log.debug(
- "Connecting to controller... ws://{} as {}/{}".format(
- self.juju_endpoint,
- self.juju_user,
- self.juju_secret,
+ "Connecting to controller... ws://{} as {}".format(
+ juju_endpoint, juju_user,
)
)
try:
- await self.controller.connect(
- endpoint=self.juju_endpoint,
- username=self.juju_user,
- password=self.juju_secret,
- cacert=self.juju_ca_cert,
+ await controller.connect(
+ endpoint=juju_endpoint,
+ username=juju_user,
+ password=juju_secret,
+ cacert=juju_ca_cert,
)
- self.authenticated = True
self.log.debug("JujuApi: Logged into controller")
+ return controller
except Exception as ex:
- print(ex)
+ self.log.debug(ex)
self.log.debug("Caught exception: {}".format(ex))
- pass
else:
self.log.fatal("VCA credentials not configured.")
- self.authenticated = False
-
- async def logout(self):
- """Logout of the Juju controller."""
- print("[logout]")
- if not self.authenticated:
- return False
-
- for model in self.models:
- print("Logging out of model {}".format(model))
- await self.models[model].disconnect()
-
- if self.controller:
- self.log.debug("Disconnecting controller {}".format(
- self.controller
- ))
- await self.controller.disconnect()
- self.controller = None
-
- self.authenticated = False
- async def remove_cloud(
- self,
- cloud_name: str,
- ) -> bool:
+ # TODO: Remove these commented lines
+ # self.authenticated = False
+ # if self.authenticated:
+ # return
+
+ # self.connecting = True
+ # juju_public_key = None
+ # self.authenticated = True
+ # Test: Make sure we have the credentials loaded
+ # async def logout(self):
+ # """Logout of the Juju controller."""
+ # self.log.debug("[logout]")
+ # if not self.authenticated:
+ # return False
+
+ # for model in self.models:
+ # self.log.debug("Logging out of model {}".format(model))
+ # await self.models[model].disconnect()
+
+ # if self.controller:
+ # self.log.debug("Disconnecting controller {}".format(self.controller))
+ # await self.controller.disconnect()
+ # self.controller = None
+
+ # self.authenticated = False
+
+ async def remove_cloud(self, cloud_name: str,) -> bool:
"""Remove a k8s cloud from Juju
Removes a Kubernetes cloud from Juju.
# Remove the bootstrapped controller
cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
# Remove the cloud from the local config
cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
return True
- async def set_config(
- self,
- cluster_uuid: str,
- config: dict,
- ) -> bool:
+ async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
"""Save the cluster configuration
- Saves the cluster information to the file store
+ Saves the cluster information to the Mongo database
:param cluster_uuid str: The UUID of the cluster
:param config dict: A dictionary containing the cluster configuration
- :returns: Boolean upon success or raises an exception.
"""
- cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
- if not os.path.exists(cluster_config):
- print("Writing config to {}".format(cluster_config))
- with open(cluster_config, 'w') as f:
- f.write(yaml.dump(config, Dumper=yaml.Dumper))
+ juju_db = self.db.get_one("admin", {"_id": "juju"})
- return True
+ k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
+ self.db.encrypt_decrypt_fields(
+ config,
+ "encrypt",
+ ["secret", "cacert"],
+ schema_version="1.1",
+ salt=cluster_uuid,
+ )
+ k8sclusters.append({"_id": cluster_uuid, "config": config})
+ self.db.set_one(
+ table="admin",
+ q_filter={"_id": "juju"},
+ update_dict={"k8sclusters": k8sclusters},
+ )