+ async def get_services(self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ namespace: str) -> list:
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+ cluster_uuid, kdu_instance
+ )
+ )
+
+ status = await self._status_kdu(
+ cluster_id, kdu_instance, return_text=False
+ )
+
+ service_names = self._parse_helm_status_service_info(status)
+ service_list = []
+ for service in service_names:
+ service = await self.get_service(cluster_uuid, service, namespace)
+ service_list.append(service)
+
+ return service_list
+
+ async def get_service(self,
+ cluster_uuid: str,
+ service_name: str,
+ namespace: str) -> object:
+
+ self.log.debug(
+ "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+ service_name, namespace, cluster_uuid)
+ )
+
+ # get paths
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+ self.kubectl_command, config_filename, namespace, service_name
+ )
+
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ service = {
+ "name": service_name,
+ "type": self._get_deep(data, ("spec", "type")),
+ "ports": self._get_deep(data, ("spec", "ports")),
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ }
+ if service["type"] == "LoadBalancer":
+ ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+ ip_list = [elem["ip"] for elem in ip_map_list]
+ service["external_ip"] = ip_list
+
+ return service
+