class K8sJujuConnector(K8sConnector):
+
def __init__(
self,
- fs,
- kubectl_command='/usr/bin/kubectl',
- log=None
+ fs: object,
+ db: object,
+ kubectl_command: str = '/usr/bin/kubectl',
+ juju_command: str = '/usr/bin/juju',
+ log=None,
+ on_update_db=None,
):
"""
# parent class
K8sConnector.__init__(
self,
- kubectl_command=kubectl_command,
- fs=fs,
+ db,
log=log,
+ on_update_db=on_update_db,
)
+ self.fs = fs
self.info('Initializing K8S Juju connector')
self.authenticated = False
self.models = {}
self.log = logging.getLogger(__name__)
+
+ self.juju_secret = ""
+
self.info('K8S Juju connector initialized')
"""Initialization"""
async def init_env(
self,
- k8s_creds: dict,
+ k8s_creds: str,
namespace: str = 'kube-system',
reuse_cluster_uuid: str = None,
- ) -> str:
+ ) -> (str, bool):
"""Initialize a Kubernetes environment
:param k8s_creds dict: A dictionary containing the Kubernetes cluster
use the CLI tools.
"""
# TODO: The path may change
- jujudir = "/snap/bin"
+ jujudir = "/usr/local/bin"
self.k8scli = "{}/juju".format(jujudir)
# Add k8s cloud to Juju (unless it's microk8s)
+ # Convert to a dict
+ # k8s_creds = yaml.safe_load(k8s_creds)
+
# Does the kubeconfig contain microk8s?
microk8s = self.is_microk8s_by_credentials(k8s_creds)
# Name the new k8s cloud
k8s_cloud = "{}-k8s".format(namespace)
+ print("Adding k8s cloud {}".format(k8s_cloud))
await self.add_k8s(k8s_cloud, k8s_creds)
# Bootstrap Juju controller
- self.bootstrap(k8s_cloud, cluster_uuid)
+ print("Bootstrapping...")
+ await self.bootstrap(k8s_cloud, cluster_uuid)
+ print("Bootstrap done.")
else:
# k8s_cloud = 'microk8s-test'
k8s_cloud = "{}-k8s".format(namespace)
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
+ print("Getting controller endpoints")
with open(os.path.expanduser(
"~/.local/share/juju/controllers.yaml"
)) as f:
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
+ print("Getting accounts")
with open(os.path.expanduser(
"~/.local/share/juju/accounts.yaml"
)) as f:
# Store the cluster configuration so it
# can be used for subsequent calls
+ print("Setting config")
await self.set_config(cluster_uuid, config)
else:
# Login to the k8s cluster
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
# We're creating a new cluster
- print("Getting model {}".format(self.get_namespace(cluster_uuid)))
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
+ model = await self.get_model(
+ self.get_namespace(cluster_uuid),
+ cluster_uuid=cluster_uuid
+ )
# Disconnect from the model
if model and model.is_connected():
await model.disconnect()
- return cluster_uuid
+ return cluster_uuid, True
"""Repo Management"""
async def repo_add(
"""Reset"""
async def reset(
- self,
- cluster_uuid: str,
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False
) -> bool:
"""Reset a cluster
try:
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
if self.controller.is_connected():
# Destroy the model
print("Caught exception during reset: {}".format(ex))
"""Deployment"""
+
async def install(
self,
cluster_uuid: str,
kdu_model: str,
atomic: bool = True,
- timeout: int = None,
+ timeout: float = 300,
params: dict = None,
- ) -> str:
+ db_dict: dict = None
+ ) -> bool:
"""Install a bundle
:param cluster_uuid str: The UUID of the cluster to install to
if not self.authenticated:
print("[install] Logging in to the controller")
- await self.login()
+ await self.login(cluster_uuid)
##
# Get or create the model, based on the namespace the cluster was
# instantiated with.
namespace = self.get_namespace(cluster_uuid)
- model = await self.get_model(namespace)
+ namespace = "gitlab-demo"
+ self.log.debug("Checking for model named {}".format(namespace))
+ model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
if not model:
# Create the new model
- model = await self.add_model(namespace)
+ self.log.debug("Adding model: {}".format(namespace))
+ model = await self.add_model(namespace, cluster_uuid=cluster_uuid)
if model:
# TODO: Instantiation parameters
- print("[install] deploying {}".format(kdu_model))
- await model.deploy(kdu_model)
+ """
+ "Juju bundle that models the KDU, in any of the following ways:
+ - <juju-repo>/<juju-bundle>
+ - <juju-bundle folder under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+ - <URL_where_to_fetch_juju_bundle>
+ """
+
+ bundle = kdu_model
+ if kdu_model.startswith("cs:"):
+ bundle = kdu_model
+ elif kdu_model.startswith("http"):
+ # Download the file
+ pass
+ else:
+ # Local file
+
+ # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
+ # Uncompress temporarily
+ # bundle = <uncompressed file>
+ pass
+
+ if not bundle:
+ # Raise named exception that the bundle could not be found
+ raise Exception()
+
+ print("[install] deploying {}".format(bundle))
+ await model.deploy(bundle)
# Get the application
if atomic:
initial release.
"""
namespace = self.get_namespace(cluster_uuid)
- model = await self.get_model(namespace)
+ model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
with open(kdu_model, 'r') as f:
- bundle = yaml.load(f, Loader=yaml.FullLoader)
+ bundle = yaml.safe_load(f)
"""
{
removed = False
# Remove an application from the model
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
if model:
# Get the application
kdu = {}
with open(kdu_model, 'r') as f:
- bundle = yaml.load(f, Loader=yaml.FullLoader)
+ bundle = yaml.safe_load(f)
"""
{
"""
status = {}
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
# model = await self.get_model_by_uuid(cluster_uuid)
if model:
async def add_k8s(
self,
cloud_name: str,
- credentials: dict,
+ credentials: str,
) -> bool:
"""Add a k8s cloud to Juju
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- input=yaml.dump(credentials, Dumper=yaml.Dumper),
- encoding='ascii'
+ # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
+ input=credentials.encode("utf-8"),
+ # encoding='ascii'
)
retcode = p.returncode
+ print("add-k8s return code: {}".format(retcode))
if retcode > 0:
raise Exception(p.stderr)
async def add_model(
self,
- model_name: str
+ model_name: str,
+ cluster_uuid: str,
) -> juju.model.Model:
"""Adds a model to the controller
raises an exception.
"""
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
+ self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
model = await self.controller.add_model(
model_name,
config={'authorized-keys': self.juju_public_key}
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
if retcode > 0:
#
- if 'already exists' not in p.stderr:
+ if b'already exists' not in p.stderr:
raise Exception(p.stderr)
return True
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if os.path.exists(cluster_config):
with open(cluster_config, 'r') as f:
- config = yaml.load(f.read(), Loader=yaml.FullLoader)
+ config = yaml.safe_load(f.read())
return config
else:
raise Exception(
async def get_model(
self,
model_name: str,
+ cluster_uuid: str,
) -> juju.model.Model:
"""Get a model from the Juju Controller.
:return The juju.model.Model object if found, or None.
"""
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
model = None
models = await self.controller.list_models()
-
+ self.log.debug(models)
if model_name in models:
+ self.log.debug("Found model: {}".format(model_name))
model = await self.controller.get_model(
model_name
)
def is_microk8s_by_credentials(
self,
- credentials: dict,
+ credentials: str,
) -> bool:
"""Check if a cluster is micro8s
:param credentials dict: A dictionary containing the k8s credentials
:returns: A boolean if the cluster is running microk8s
"""
- for context in credentials['contexts']:
- if 'microk8s' in context['name']:
- return True
+ creds = yaml.safe_load(credentials)
+ if creds:
+ for context in creds['contexts']:
+ if 'microk8s' in context['name']:
+ return True
return False
- async def login(self):
+ async def login(self, cluster_uuid):
"""Login to the Juju controller."""
if self.authenticated:
self.connecting = True
+ # Test: Make sure we have the credentials loaded
+ config = self.get_config(cluster_uuid)
+
+ self.juju_endpoint = config['endpoint']
+ self.juju_user = config['username']
+ self.juju_secret = config['secret']
+ self.juju_ca_cert = config['cacert']
+ self.juju_public_key = None
+
self.controller = Controller()
if self.juju_secret:
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode