from juju.model import Model
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
return status
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str = None) -> list:
- """
- Returns empty list as currently add_repo is not implemented
- """
- raise MethodNotImplemented
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str = None) -> object:
- """
- Returns empty list as currently add_repo is not implemented
- """
- raise MethodNotImplemented
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
# Private methods
async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
if "already exists" not in stderr:
raise Exception(stderr)
+ def get_config_file(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig location
+ """
+ return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
+
def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration