import asyncio
import concurrent
-from .exceptions import NotImplemented
+import os
+import uuid
+import yaml
-import io
import juju
-# from juju.bundle import BundleHandler
from juju.controller import Controller
from juju.model import Model
-from juju.errors import JujuAPIError, JujuError
from n2vc.exceptions import K8sException
-
from n2vc.k8s_conn import K8sConnector
-import os
+from .exceptions import MethodNotImplemented
+
+
+# from juju.bundle import BundleHandler
# import re
# import ssl
# from .vnf import N2VC
-
-import uuid
-import yaml
-
-
class K8sJujuConnector(K8sConnector):
-
def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = '/usr/bin/kubectl',
- juju_command: str = '/usr/bin/juju',
- log: object = None,
- on_update_db=None,
+ self,
+ fs: object,
+ db: object,
+ kubectl_command: str = "/usr/bin/kubectl",
+ juju_command: str = "/usr/bin/juju",
+ log: object = None,
+ on_update_db=None,
):
"""
# parent class
K8sConnector.__init__(
- self,
- db,
- log=log,
- on_update_db=on_update_db,
+ self, db, log=log, on_update_db=on_update_db,
)
self.fs = fs
- self.log.debug('Initializing K8S Juju connector')
+ self.log.debug("Initializing K8S Juju connector")
self.authenticated = False
self.models = {}
self.juju_command = juju_command
self.juju_secret = ""
- self.log.debug('K8S Juju connector initialized')
+ self.log.debug("K8S Juju connector initialized")
"""Initialization"""
+
async def init_env(
self,
k8s_creds: str,
- namespace: str = 'kube-system',
+ namespace: str = "kube-system",
reuse_cluster_uuid: str = None,
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Juju bundles.
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
- :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
+ '.kube/config'
+ :param namespace: optional namespace to be used for juju. By default,
+ 'kube-system' will be used
:param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
- (on error, an exception will be raised)
+ :return: uuid of the K8s cluster and True if connector has installed some
+ software in the cluster
+ (on error, an exception will be raised)
"""
"""Bootstrapping
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
self.log.debug("Getting controller endpoints")
- with open(os.path.expanduser(
- "~/.local/share/juju/controllers.yaml"
- )) as f:
+ with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
- endpoints = controller['api-endpoints']
+ controller = controllers["controllers"][cluster_uuid]
+ endpoints = controller["api-endpoints"]
self.juju_endpoint = endpoints[0]
- self.juju_ca_cert = controller['ca-cert']
+ self.juju_ca_cert = controller["ca-cert"]
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
self.log.debug("Getting accounts")
- with open(os.path.expanduser(
- "~/.local/share/juju/accounts.yaml"
- )) as f:
+ with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers['controllers'][cluster_uuid]
+ controller = controllers["controllers"][cluster_uuid]
- self.juju_user = controller['user']
- self.juju_secret = controller['password']
+ self.juju_user = controller["user"]
+ self.juju_secret = controller["password"]
# raise Exception("EOL")
self.juju_public_key = None
config = {
- 'endpoint': self.juju_endpoint,
- 'username': self.juju_user,
- 'secret': self.juju_secret,
- 'cacert': self.juju_ca_cert,
- 'namespace': namespace,
- 'loadbalancer': loadbalancer,
+ "endpoint": self.juju_endpoint,
+ "username": self.juju_user,
+ "secret": self.juju_secret,
+ "cacert": self.juju_ca_cert,
+ "namespace": namespace,
+ "loadbalancer": loadbalancer,
}
# Store the cluster configuration so it
config = self.get_config(cluster_uuid)
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
+ self.juju_endpoint = config["endpoint"]
+ self.juju_user = config["username"]
+ self.juju_secret = config["secret"]
+ self.juju_ca_cert = config["cacert"]
self.juju_public_key = None
# Login to the k8s cluster
await self.login(cluster_uuid)
# We're creating a new cluster
- #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
- #model = await self.get_model(
+ # print("Getting model {}".format(self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid))
+ # model = await self.get_model(
# self.get_namespace(cluster_uuid),
# cluster_uuid=cluster_uuid
- #)
+ # )
- ## Disconnect from the model
- #if model and model.is_connected():
+ # Disconnect from the model
+ # if model and model.is_connected():
# await model.disconnect()
return cluster_uuid, True
"""Repo Management"""
+
async def repo_add(
- self,
- name: str,
- url: str,
- type: str = "charm",
+ self, name: str, url: str, _type: str = "charm",
):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_list(self):
- raise NotImplemented()
+ raise MethodNotImplemented()
async def repo_remove(
- self,
- name: str,
+ self, name: str,
):
- raise NotImplemented()
+ raise MethodNotImplemented()
- async def synchronize_repos(
- self,
- cluster_uuid: str,
- name: str
- ):
+ async def synchronize_repos(self, cluster_uuid: str, name: str):
"""
Returns None as currently add_repo is not implemented
"""
return None
"""Reset"""
+
async def reset(
- self,
- cluster_uuid: str,
- force: bool = False,
- uninstall_sw: bool = False
+ self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
"""Reset a cluster
namespace = self.get_namespace(cluster_uuid)
if await self.has_model(namespace):
self.log.debug("[reset] Destroying model")
- await self.controller.destroy_model(
- namespace,
- destroy_storage=True
- )
+ await self.controller.destroy_model(namespace, destroy_storage=True)
# Disconnect from the controller
self.log.debug("[reset] Disconnecting controller")
params: dict = None,
db_dict: dict = None,
kdu_name: str = None,
- namespace: str = None
+ namespace: str = None,
) -> bool:
"""Install a bundle
"Juju bundle that models the KDU, in any of the following ways:
- <juju-repo>/<juju-bundle>
- <juju-bundle folder under k8s_models folder in the package>
- - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
+ in the package>
- <URL_where_to_fetch_juju_bundle>
"""
-
- previous_workdir = os.getcwd()
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
bundle = kdu_model
if kdu_model.startswith("cs:"):
self.log.debug("Waiting for all units to be active...")
await model.block_until(
lambda: all(
- unit.agent_status == 'idle'
- and application.status in ['active', 'unknown']
- and unit.workload_status in [
- 'active', 'unknown'
- ] for unit in application.units
+ unit.agent_status == "idle"
+ and application.status in ["active", "unknown"]
+ and unit.workload_status in ["active", "unknown"]
+ for unit in application.units
),
- timeout=timeout
+ timeout=timeout,
)
self.log.debug("All units active.")
- except concurrent.futures._base.TimeoutError: # TODO use asyncio.TimeoutError
+ # TODO use asyncio.TimeoutError
+ except concurrent.futures._base.TimeoutError:
os.chdir(previous_workdir)
self.log.debug("[install] Timeout exceeded; resetting cluster")
await self.reset(cluster_uuid)
return kdu_instance
raise Exception("Unable to install")
- async def instances_list(
- self,
- cluster_uuid: str
- ) -> list:
+ async def instances_list(self, cluster_uuid: str) -> list:
"""
returns a list of deployed releases in a cluster
namespace = self.get_namespace(cluster_uuid)
model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
- with open(kdu_model, 'r') as f:
+ with open(kdu_model, "r") as f:
bundle = yaml.safe_load(f)
"""
}
"""
# TODO: This should be returned in an agreed-upon format
- for name in bundle['applications']:
+ for name in bundle["applications"]:
self.log.debug(model.applications)
application = model.applications[name]
self.log.debug(application)
- path = bundle['applications'][name]['charm']
+ path = bundle["applications"][name]["charm"]
try:
await application.upgrade_charm(switch=path)
except juju.errors.JujuError as ex:
- if 'already running charm' in str(ex):
+ if "already running charm" in str(ex):
# We're already running this version
pass
await model.disconnect()
return True
- raise NotImplemented()
+ raise MethodNotImplemented()
"""Rollback"""
+
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision: int = 0,
+ self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
) -> str:
"""Rollback a model
:return: If successful, returns the revision of active KDU instance,
or raises an exception
"""
- raise NotImplemented()
+ raise MethodNotImplemented()
"""Deletion"""
- async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str
- ) -> bool:
+
+ async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
"""Uninstall a KDU instance
:param cluster_uuid str: The UUID of the cluster
await self.login(cluster_uuid)
if not params or "application-name" not in params:
- raise K8sException("Missing application-name argument, \
- argument needed for K8s actions")
+ raise K8sException(
+ "Missing application-name argument, \
+ argument needed for K8s actions"
+ )
try:
- self.log.debug("[exec_primitive] Getting model "
- "kdu_instance: {}".format(kdu_instance))
+ self.log.debug(
+ "[exec_primitive] Getting model "
+ "kdu_instance: {}".format(kdu_instance)
+ )
model = await self.get_model(kdu_instance, cluster_uuid)
)
if status != "completed":
- raise K8sException("status is not completed: {} output: {}".format(status, output))
+ raise K8sException(
+ "status is not completed: {} output: {}".format(status, output)
+ )
return output
raise K8sException(message=error_msg)
"""Introspection"""
- async def inspect_kdu(
- self,
- kdu_model: str,
- ) -> dict:
+
+ async def inspect_kdu(self, kdu_model: str,) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
"""
kdu = {}
- with open(kdu_model, 'r') as f:
+ with open(kdu_model, "r") as f:
bundle = yaml.safe_load(f)
"""
}
"""
# TODO: This should be returned in an agreed-upon format
- kdu = bundle['applications']
+ kdu = bundle["applications"]
return kdu
- async def help_kdu(
- self,
- kdu_model: str,
- ) -> str:
+ async def help_kdu(self, kdu_model: str,) -> str:
"""View the README
If available, returns the README of the bundle.
"""
readme = None
- files = ['README', 'README.txt', 'README.md']
+ files = ["README", "README.txt", "README.md"]
path = os.path.dirname(kdu_model)
for file in os.listdir(path):
if file in files:
- with open(file, 'r') as f:
+ with open(file, "r") as f:
readme = f.read()
break
return readme
- async def status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- ) -> dict:
+ async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
"""Get the status of the KDU
Get the current status of the KDU instance.
"""
status = {}
- model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
+ model = await self.get_model(
+ self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
+ )
# model = await self.get_model_by_uuid(cluster_uuid)
if model:
for name in model_status.applications:
application = model_status.applications[name]
- status[name] = {
- 'status': application['status']['status']
- }
+ status[name] = {"status": application["status"]["status"]}
if model.is_connected():
await model.disconnect()
return status
+ async def get_services(self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ namespace: str = None) -> list:
+ """
+ Returns empty list as currently add_repo is not implemented
+ """
+ raise MethodNotImplemented
+
+ async def get_service(self,
+ cluster_uuid: str,
+ service_name: str,
+ namespace: str = None) -> object:
+ """
+ Returns empty list as currently add_repo is not implemented
+ """
+ raise MethodNotImplemented
+
# Private methods
- async def add_k8s(
- self,
- cloud_name: str,
- credentials: str,
- ) -> bool:
+ async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
"""Add a k8s cloud to Juju
Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
await process.stdin.drain()
process.stdin.close()
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
return True
- async def add_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
"""Adds a model to the controller
Adds a new model to the Juju controller
if not self.authenticated:
await self.login(cluster_uuid)
- self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
+ self.log.debug(
+ "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
+ )
try:
- model = await self.controller.add_model(
- model_name,
- config={'authorized-keys': self.juju_public_key}
- )
+ if self.juju_public_key is not None:
+ model = await self.controller.add_model(
+ model_name, config={"authorized-keys": self.juju_public_key}
+ )
+ else:
+ model = await self.controller.add_model(model_name)
except Exception as ex:
self.log.debug(ex)
self.log.debug("Caught exception: {}".format(ex))
return model
async def bootstrap(
- self,
- cloud_name: str,
- cluster_uuid: str,
- loadbalancer: bool
+ self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
) -> bool:
"""Bootstrap a Kubernetes controller
cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
else:
"""
- For public clusters, specify that the controller service is using a LoadBalancer.
+ For public clusters, specify that the controller service is using a
+ LoadBalancer.
"""
- cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
-
- self.log.debug("Bootstrapping controller {} in cloud {}".format(
- cluster_uuid, cloud_name
- ))
+ cmd = [
+ self.juju_command,
+ "bootstrap",
+ cloud_name,
+ cluster_uuid,
+ "--config",
+ "controller-service-type=loadbalancer",
+ ]
+
+ self.log.debug(
+ "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
+ )
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if b'already exists' not in stderr:
+ if b"already exists" not in stderr:
raise Exception(stderr)
return True
- async def destroy_controller(
- self,
- cluster_uuid: str
- ) -> bool:
+ async def destroy_controller(self, cluster_uuid: str) -> bool:
"""Destroy a Kubernetes controller
Destroy an existing Kubernetes controller.
"--destroy-all-models",
"--destroy-storage",
"-y",
- cluster_uuid
+ cluster_uuid,
]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
if return_code > 0:
#
- if 'already exists' not in stderr:
+ if "already exists" not in stderr:
raise Exception(stderr)
- def get_config(
- self,
- cluster_uuid: str,
- ) -> dict:
+ def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration
Gets the configuration of the cluster
"""
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if os.path.exists(cluster_config):
- with open(cluster_config, 'r') as f:
+ with open(cluster_config, "r") as f:
config = yaml.safe_load(f.read())
return config
else:
raise Exception(
- "Unable to locate configuration for cluster {}".format(
- cluster_uuid
- )
+ "Unable to locate configuration for cluster {}".format(cluster_uuid)
)
- async def get_model(
- self,
- model_name: str,
- cluster_uuid: str,
- ) -> juju.model.Model:
+ async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
"""Get a model from the Juju Controller.
Note: Model objects returned must call disconnected() before it goes
models = await self.controller.list_models()
if model_name in models:
self.log.debug("Found model: {}".format(model_name))
- model = await self.controller.get_model(
- model_name
- )
+ model = await self.controller.get_model(model_name)
return model
- def get_namespace(
- self,
- cluster_uuid: str,
- ) -> str:
+ def get_namespace(self, cluster_uuid: str,) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
config = self.get_config(cluster_uuid)
# Make sure the name is in the config
- if 'namespace' not in config:
+ if "namespace" not in config:
raise Exception("Namespace not found.")
# TODO: We want to make sure this is unique to the cluster, in case
# the cluster is being reused.
# Consider pre/appending the cluster id to the namespace string
- return config['namespace']
+ return config["namespace"]
- async def has_model(
- self,
- model_name: str
- ) -> bool:
+ async def has_model(self, model_name: str) -> bool:
"""Check if a model exists in the controller
Checks to see if a model exists in the connected Juju controller.
return True
return False
- def is_local_k8s(
- self,
- credentials: str,
- ) -> bool:
+ def is_local_k8s(self, credentials: str,) -> bool:
"""Check if a cluster is local
Checks if a cluster is running in the local host
host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
if creds and host_ip:
- for cluster in creds['clusters']:
- if 'server' in cluster['cluster']:
- if host_ip in cluster['cluster']['server']:
+ for cluster in creds["clusters"]:
+ if "server" in cluster["cluster"]:
+ if host_ip in cluster["cluster"]["server"]:
return True
return False
# Test: Make sure we have the credentials loaded
config = self.get_config(cluster_uuid)
- self.juju_endpoint = config['endpoint']
- self.juju_user = config['username']
- self.juju_secret = config['secret']
- self.juju_ca_cert = config['cacert']
+ self.juju_endpoint = config["endpoint"]
+ self.juju_user = config["username"]
+ self.juju_secret = config["secret"]
+ self.juju_ca_cert = config["cacert"]
self.juju_public_key = None
self.controller = Controller()
if self.juju_secret:
self.log.debug(
"Connecting to controller... ws://{} as {}/{}".format(
- self.juju_endpoint,
- self.juju_user,
- self.juju_secret,
+ self.juju_endpoint, self.juju_user, self.juju_secret,
)
)
try:
await self.models[model].disconnect()
if self.controller:
- self.log.debug("Disconnecting controller {}".format(
- self.controller
- ))
+ self.log.debug("Disconnecting controller {}".format(self.controller))
await self.controller.disconnect()
self.controller = None
self.authenticated = False
- async def remove_cloud(
- self,
- cloud_name: str,
- ) -> bool:
+ async def remove_cloud(self, cloud_name: str,) -> bool:
"""Remove a k8s cloud from Juju
Removes a Kubernetes cloud from Juju.
# Remove the bootstrapped controller
cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
# Remove the cloud from the local config
cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
process = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
+ *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
- stdout, stderr = await process.communicate()
+ _stdout, stderr = await process.communicate()
return_code = process.returncode
return True
- async def set_config(
- self,
- cluster_uuid: str,
- config: dict,
- ) -> bool:
+ async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
"""Save the cluster configuration
Saves the cluster information to the file store
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if not os.path.exists(cluster_config):
self.log.debug("Writing config to {}".format(cluster_config))
- with open(cluster_config, 'w') as f:
+ with open(cluster_config, "w") as f:
f.write(yaml.dump(config, Dumper=yaml.Dumper))
return True