from juju.model import Model
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
in the package>
- <URL_where_to_fetch_juju_bundle>
"""
-
- previous_workdir = os.getcwd()
+ try:
+ previous_workdir = os.getcwd()
+ except FileNotFoundError:
+ previous_workdir = "/app/storage"
bundle = kdu_model
if kdu_model.startswith("cs:"):
return status
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
+
# Private methods
async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
"""Add a k8s cloud to Juju
if "already exists" not in stderr:
raise Exception(stderr)
+ def get_config_file(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig location
+ """
+ return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
+
def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration