from juju.model import Model
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
# reuse_cluster_uuid, e.g. to try to fix it. #
###################################################
- if not reuse_cluster_uuid:
- # This is a new cluster, so bootstrap it
-
- cluster_uuid = str(uuid.uuid4())
-
- # Is a local k8s cluster?
- localk8s = self.is_local_k8s(k8s_creds)
+ # This is a new cluster, so bootstrap it
- # If the k8s is external, the juju controller needs a loadbalancer
- loadbalancer = False if localk8s else True
+ cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
- # Name the new k8s cloud
- k8s_cloud = "k8s-{}".format(cluster_uuid)
+ # Is a local k8s cluster?
+ localk8s = self.is_local_k8s(k8s_creds)
- self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
- await self.add_k8s(k8s_cloud, k8s_creds)
+ # If the k8s is external, the juju controller needs a loadbalancer
+ loadbalancer = False if localk8s else True
- # Bootstrap Juju controller
- self.log.debug("Bootstrapping...")
- await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
- self.log.debug("Bootstrap done.")
+ # Name the new k8s cloud
+ k8s_cloud = "k8s-{}".format(cluster_uuid)
- # Get the controller information
+ self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
+ await self.add_k8s(k8s_cloud, k8s_creds)
- # Parse ~/.local/share/juju/controllers.yaml
- # controllers.testing.api-endpoints|ca-cert|uuid
- self.log.debug("Getting controller endpoints")
- with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
- endpoints = controller["api-endpoints"]
- self.juju_endpoint = endpoints[0]
- self.juju_ca_cert = controller["ca-cert"]
+ # Bootstrap Juju controller
+ self.log.debug("Bootstrapping...")
+ await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
+ self.log.debug("Bootstrap done.")
- # Parse ~/.local/share/juju/accounts
- # controllers.testing.user|password
- self.log.debug("Getting accounts")
- with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
- controllers = yaml.load(f, Loader=yaml.Loader)
- controller = controllers["controllers"][cluster_uuid]
+ # Get the controller information
- self.juju_user = controller["user"]
- self.juju_secret = controller["password"]
+ # Parse ~/.local/share/juju/controllers.yaml
+ # controllers.testing.api-endpoints|ca-cert|uuid
+ self.log.debug("Getting controller endpoints")
+ with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers["controllers"][cluster_uuid]
+ endpoints = controller["api-endpoints"]
+ self.juju_endpoint = endpoints[0]
+ self.juju_ca_cert = controller["ca-cert"]
- # raise Exception("EOL")
+ # Parse ~/.local/share/juju/accounts
+ # controllers.testing.user|password
+ self.log.debug("Getting accounts")
+ with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
+ controllers = yaml.load(f, Loader=yaml.Loader)
+ controller = controllers["controllers"][cluster_uuid]
- self.juju_public_key = None
+ self.juju_user = controller["user"]
+ self.juju_secret = controller["password"]
- config = {
- "endpoint": self.juju_endpoint,
- "username": self.juju_user,
- "secret": self.juju_secret,
- "cacert": self.juju_ca_cert,
- "namespace": namespace,
- "loadbalancer": loadbalancer,
- }
-
- # Store the cluster configuration so it
- # can be used for subsequent calls
- self.log.debug("Setting config")
- await self.set_config(cluster_uuid, config)
+ # raise Exception("EOL")
- else:
- # This is an existing cluster, so get its config
- cluster_uuid = reuse_cluster_uuid
+ self.juju_public_key = None
- config = self.get_config(cluster_uuid)
+ config = {
+ "endpoint": self.juju_endpoint,
+ "username": self.juju_user,
+ "secret": self.juju_secret,
+ "cacert": self.juju_ca_cert,
+ "namespace": namespace,
+ "loadbalancer": loadbalancer,
+ }
- self.juju_endpoint = config["endpoint"]
- self.juju_user = config["username"]
- self.juju_secret = config["secret"]
- self.juju_ca_cert = config["cacert"]
- self.juju_public_key = None
+ # Store the cluster configuration so it
+ # can be used for subsequent calls
+ self.log.debug("Setting config")
+ await self.set_config(cluster_uuid, config)
# Login to the k8s cluster
if not self.authenticated:
in the package>
- <URL_where_to_fetch_juju_bundle>
"""
-
- previous_workdir = os.getcwd()
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
bundle = kdu_model
if kdu_model.startswith("cs:"):
return status
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
+
# Private methods
async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
"""Add a k8s cloud to Juju
self.log.debug(
"Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
)
+ model = None
try:
if self.juju_public_key is not None:
model = await self.controller.add_model(
if "already exists" not in stderr:
raise Exception(stderr)
+ def get_config_file(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig location
+ """
+ return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
+
def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration
:param credentials dict: A dictionary containing the k8s credentials
:returns: A boolean if the cluster is running locally
"""
+
creds = yaml.safe_load(credentials)
- if os.getenv("OSMLCM_VCA_APIPROXY"):
- host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
- if creds and host_ip:
+ if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
for cluster in creds["clusters"]:
if "server" in cluster["cluster"]:
- if host_ip in cluster["cluster"]["server"]:
+ if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
return True
return False