# See the License for the specific language governing permissions and
# limitations under the License.
+import asyncio
import concurrent
from .exceptions import NotImplemented
+import io
import juju
# from juju.bundle import BundleHandler
from juju.controller import Controller
import os
# import re
# import ssl
-import subprocess
# from .vnf import N2VC
import uuid
namespace: str = 'kube-system',
reuse_cluster_uuid: str = None,
) -> (str, bool):
- """Initialize a Kubernetes environment
-
- :param k8s_creds dict: A dictionary containing the Kubernetes cluster
- configuration
- :param namespace str: The Kubernetes namespace to initialize
+ """
+ It prepares a given K8s cluster environment to run Juju bundles.
- :return: UUID of the k8s context or raises an exception
+ :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
+ :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
+ :param reuse_cluster_uuid: existing cluster uuid for reuse
+ :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
+ (on error, an exception will be raised)
"""
"""Bootstrapping
# TODO: Pull info from db based on the namespace #
##################################################
+ ###################################################
+ # TODO: Make it idempotent, calling add-k8s and #
+ # bootstrap whenever reuse_cluster_uuid is passed #
+ # as parameter #
+ # `init_env` is called to initialize the K8s #
+ # cluster for juju. If this initialization fails, #
+ # it can be called again by LCM with the param #
+ # reuse_cluster_uuid, e.g. to try to fix it. #
+ ###################################################
+
if not reuse_cluster_uuid:
# This is a new cluster, so bootstrap it
cluster_uuid = str(uuid.uuid4())
- # Add k8s cloud to Juju (unless it's microk8s)
+ # Is a local k8s cluster?
+ localk8s = self.is_local_k8s(k8s_creds)
- # Does the kubeconfig contain microk8s?
- microk8s = self.is_microk8s_by_credentials(k8s_creds)
+ # If the k8s is external, the juju controller needs a loadbalancer
+ loadbalancer = False if localk8s else True
# Name the new k8s cloud
- k8s_cloud = "{}-k8s".format(namespace)
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
print("Adding k8s cloud {}".format(k8s_cloud))
await self.add_k8s(k8s_cloud, k8s_creds)
# Bootstrap Juju controller
print("Bootstrapping...")
- await self.bootstrap(k8s_cloud, cluster_uuid, microk8s)
+ await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
print("Bootstrap done.")
# Get the controller information
'secret': self.juju_secret,
'cacert': self.juju_ca_cert,
'namespace': namespace,
- 'microk8s': microk8s,
+ 'loadbalancer': loadbalancer,
}
# Store the cluster configuration so it
await self.login(cluster_uuid)
# We're creating a new cluster
- print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
- model = await self.get_model(
- self.get_namespace(cluster_uuid),
- cluster_uuid=cluster_uuid
- )
+ #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
+ #model = await self.get_model(
+ # self.get_namespace(cluster_uuid),
+ # cluster_uuid=cluster_uuid
+ #)
- # Disconnect from the model
- if model and model.is_connected():
- await model.disconnect()
+ ## Disconnect from the model
+ #if model and model.is_connected():
+ # await model.disconnect()
return cluster_uuid, True
):
raise NotImplemented()
+ async def synchronize_repos(
+ self,
+ cluster_uuid: str,
+ name: str
+ ):
+ """
+ Returns None as currently add_repo is not implemented
+ """
+ return None
+
"""Reset"""
async def reset(
self,
# Disconnect from the controller
print("[reset] Disconnecting controller")
- await self.controller.disconnect()
+ await self.logout()
# Destroy the controller (via CLI)
print("[reset] Destroying controller")
await self.destroy_controller(cluster_uuid)
- """Remove the k8s cloud
-
- Only remove the k8s cloud if it's not a microk8s cloud,
- since microk8s is a built-in cloud type.
- """
- # microk8s = self.is_microk8s_by_cluster_uuid(cluster_uuid)
- # if not microk8s:
print("[reset] Removing k8s cloud")
- namespace = self.get_namespace(cluster_uuid)
- k8s_cloud = "{}-k8s".format(namespace)
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
await self.remove_cloud(k8s_cloud)
except Exception as ex:
print("Caught exception during reset: {}".format(ex))
+ return True
+
"""Deployment"""
async def install(
atomic: bool = True,
timeout: float = 300,
params: dict = None,
- db_dict: dict = None
+ db_dict: dict = None,
+ kdu_name: str = None
) -> bool:
"""Install a bundle
:param timeout int: The time, in seconds, to wait for the install
to finish
:param params dict: Key-value pairs of instantiation parameters
+ :param kdu_name: Name of the KDU instance to be installed
:return: If successful, returns ?
"""
await self.login(cluster_uuid)
##
- # Get or create the model, based on the namespace the cluster was
- # instantiated with.
- namespace = self.get_namespace(cluster_uuid)
+ # Get or create the model, based on the NS
+ # uuid.
+ if kdu_name:
+ kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+ else:
+ kdu_instance = db_dict["filter"]["_id"]
- self.log.debug("Checking for model named {}".format(namespace))
- model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
- if not model:
- # Create the new model
- self.log.debug("Adding model: {}".format(namespace))
- model = await self.add_model(namespace, cluster_uuid=cluster_uuid)
+ self.log.debug("Checking for model named {}".format(kdu_instance))
+
+ # Create the new model
+ self.log.debug("Adding model: {}".format(kdu_instance))
+ model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
if model:
# TODO: Instantiation parameters
print("[install] Disconnecting model")
await model.disconnect()
- return True
+ return kdu_instance
raise Exception("Unable to install")
async def instances_list(
async def uninstall(
self,
cluster_uuid: str,
- kdu_instance: str,
+ kdu_instance: str
) -> bool:
"""Uninstall a KDU instance
- :param cluster_uuid str: The UUID of the cluster to uninstall
+ :param cluster_uuid str: The UUID of the cluster
:param kdu_instance str: The unique name of the KDU instance
:return: Returns True if successful, or raises an exception
"""
- removed = False
-
- # Remove an application from the model
- model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
-
- if model:
- # Get the application
- if kdu_instance not in model.applications:
- # TODO: Raise a named exception
- raise Exception("Application not found.")
+ if not self.authenticated:
+ self.log.debug("[uninstall] Connecting to controller")
+ await self.login(cluster_uuid)
- application = model.applications[kdu_instance]
+ self.log.debug("[uninstall] Destroying model")
- # Destroy the application
- await application.destroy()
+ await self.controller.destroy_models(kdu_instance)
- # TODO: Verify removal
+ self.log.debug("[uninstall] Model destroyed and disconnecting")
+ await self.logout()
- removed = True
- return removed
+ return True
"""Introspection"""
async def inspect_kdu(
cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
print(cmd)
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
- input=credentials.encode("utf-8"),
- # encoding='ascii'
+
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ stdin=asyncio.subprocess.PIPE,
)
- retcode = p.returncode
- print("add-k8s return code: {}".format(retcode))
- if retcode > 0:
- raise Exception(p.stderr)
+ # Feed the process the credentials
+ process.stdin.write(credentials.encode("utf-8"))
+ await process.stdin.drain()
+ process.stdin.close()
+
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ print("add-k8s return code: {}".format(return_code))
+
+ if return_code > 0:
+ raise Exception(stderr)
+
return True
async def add_model(
await self.login(cluster_uuid)
self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
- model = await self.controller.add_model(
- model_name,
- config={'authorized-keys': self.juju_public_key}
- )
+ try:
+ model = await self.controller.add_model(
+ model_name,
+ config={'authorized-keys': self.juju_public_key}
+ )
+ except Exception as ex:
+ self.log.debug(ex)
+ self.log.debug("Caught exception: {}".format(ex))
+ pass
+
return model
async def bootstrap(
self,
cloud_name: str,
cluster_uuid: str,
- microk8s: bool
+ loadbalancer: bool
) -> bool:
"""Bootstrap a Kubernetes controller
:param cloud_name str: The name of the cloud.
:param cluster_uuid str: The UUID of the cluster to bootstrap.
- :param microk8s bool: If this is a microk8s cluster.
+ :param loadbalancer bool: If the controller should use loadbalancer or not.
:returns: True upon success or raises an exception.
"""
- if microk8s:
+ if not loadbalancer:
cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
else:
"""
- For non-microk8s clusters, specify that the controller service is using a LoadBalancer.
+ For public clusters, specify that the controller service is using a LoadBalancer.
"""
cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
cluster_uuid, cloud_name
))
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # encoding='ascii'
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
)
- retcode = p.returncode
- if retcode > 0:
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ if return_code > 0:
#
- if b'already exists' not in p.stderr:
- raise Exception(p.stderr)
+ if b'already exists' not in stderr:
+ raise Exception(stderr)
return True
cluster_uuid
]
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # encoding='ascii'
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
)
- retcode = p.returncode
- if retcode > 0:
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ if return_code > 0:
#
- if 'already exists' not in p.stderr:
- raise Exception(p.stderr)
+ if 'already exists' not in stderr:
+ raise Exception(stderr)
def get_config(
self,
return True
return False
- def is_microk8s_by_cluster_uuid(
- self,
- cluster_uuid: str,
- ) -> bool:
- """Check if a cluster is micro8s
-
- Checks if a cluster is running microk8s
-
- :param cluster_uuid str: The UUID of the cluster
- :returns: A boolean if the cluster is running microk8s
- """
- config = self.get_config(cluster_uuid)
- return config['microk8s']
-
- def is_microk8s_by_credentials(
+ def is_local_k8s(
self,
credentials: str,
) -> bool:
- """Check if a cluster is micro8s
+ """Check if a cluster is local
- Checks if a cluster is running microk8s
+ Checks if a cluster is running in the local host
:param credentials dict: A dictionary containing the k8s credentials
- :returns: A boolean if the cluster is running microk8s
+ :returns: A boolean if the cluster is running locally
"""
creds = yaml.safe_load(credentials)
- if creds:
- for context in creds['contexts']:
- if 'microk8s' in context['name']:
- return True
+ if os.getenv("OSMLCM_VCA_APIPROXY"):
+ host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
+
+ if creds and host_ip:
+ for cluster in creds['clusters']:
+ if 'server' in cluster['cluster']:
+ if host_ip in cluster['cluster']['server']:
+ return True
return False
# Remove the bootstrapped controller
cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # encoding='ascii'
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
)
- retcode = p.returncode
- if retcode > 0:
- raise Exception(p.stderr)
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+
+ if return_code > 0:
+ raise Exception(stderr)
# Remove the cloud from the local config
cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
- p = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- # encoding='ascii'
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
)
- retcode = p.returncode
- if retcode > 0:
- raise Exception(p.stderr)
+ stdout, stderr = await process.communicate()
+
+ return_code = process.returncode
+ if return_code > 0:
+ raise Exception(stderr)
return True