from juju.model import Model
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
return status
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str = None) -> list:
- """
- Returns empty list as currently add_repo is not implemented
- """
- raise MethodNotImplemented
+ async def get_services(
+ self, cluster_uuid: str, kdu_instance: str, namespace: str
+ ) -> list:
+ """Return a list of services of a kdu_instance"""
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str = None) -> object:
- """
- Returns empty list as currently add_repo is not implemented
- """
- raise MethodNotImplemented
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+ return kubectl.get_services(
+ field_selector="metadata.namespace={}".format(kdu_instance)
+ )
+
+ async def get_service(
+ self, cluster_uuid: str, service_name: str, namespace: str
+ ) -> object:
+ """Return data for a specific service inside a namespace"""
+
+ config_file = self.get_config_file(cluster_uuid=cluster_uuid)
+ kubectl = Kubectl(config_file=config_file)
+
+ return kubectl.get_services(
+ field_selector="metadata.name={},metadata.namespace={}".format(
+ service_name, namespace
+ )
+ )[0]
# Private methods
async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
if "already exists" not in stderr:
raise Exception(stderr)
+ def get_config_file(self, cluster_uuid: str) -> str:
+ """
+ Get Cluster Kubeconfig location
+ """
+ return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
+
def get_config(self, cluster_uuid: str,) -> dict:
"""Get the cluster configuration
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kubernetes import client, config
+from kubernetes.client.rest import ApiException
+import logging
+
+
+class Kubectl:
+ def __init__(self, config_file=None):
+ config.load_kube_config(config_file=config_file)
+ self.logger = logging.getLogger("Kubectl")
+
+ def get_services(self, field_selector=None, label_selector=None):
+ kwargs = {}
+ if field_selector:
+ kwargs["field_selector"] = field_selector
+ if label_selector:
+ kwargs["label_selector"] = label_selector
+
+ try:
+ v1 = client.CoreV1Api()
+ result = v1.list_service_for_all_namespaces(**kwargs)
+ return [
+ {
+ "name": i.metadata.name,
+ "cluster_ip": i.spec.cluster_ip,
+ "type": i.spec.type,
+ "ports": i.spec.ports,
+ "external_ip": [i.ip for i in i.status.load_balancer.ingress]
+ if i.status.load_balancer.ingress
+ else None,
+ }
+ for i in result.items
+ ]
+ except ApiException as e:
+ self.logger.error("Error calling get services: {}".format(e))
+ raise e
--- /dev/null
+# Copyright 2020 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase, mock
+from n2vc.kubectl import Kubectl
+from n2vc.utils import Dict
+from kubernetes.client.rest import ApiException
+
+fake_list_services = Dict(
+ {
+ "items": [
+ Dict(
+ {
+ "metadata": Dict(
+ {
+ "name": "squid",
+ "namespace": "test",
+ "labels": {"juju-app": "squid"},
+ }
+ ),
+ "spec": Dict(
+ {
+ "cluster_ip": "10.152.183.79",
+ "type": "LoadBalancer",
+ "ports": [
+ {
+ "name": None,
+ "node_port": None,
+ "port": 30666,
+ "protocol": "TCP",
+ "target_port": 30666,
+ }
+ ],
+ }
+ ),
+ "status": Dict(
+ {
+ "load_balancer": Dict(
+ {
+ "ingress": [
+ Dict({"hostname": None, "ip": "192.168.0.201"})
+ ]
+ }
+ )
+ }
+ ),
+ }
+ )
+ ]
+ }
+)
+
+
+class FakeCoreV1Api:
+ def list_service_for_all_namespaces(self, **kwargs):
+ return fake_list_services
+
+
+class ProvisionerTest(TestCase):
+ @mock.patch("n2vc.kubectl.config.load_kube_config")
+ @mock.patch("n2vc.kubectl.client.CoreV1Api")
+ def setUp(self, mock_core, mock_config):
+ mock_core.return_value = mock.MagicMock()
+ mock_config.return_value = mock.MagicMock()
+ self.kubectl = Kubectl()
+
+ @mock.patch("n2vc.kubectl.client.CoreV1Api")
+ def test_get_service(self, mock_corev1api):
+ mock_corev1api.return_value = FakeCoreV1Api()
+ services = self.kubectl.get_services(
+ field_selector="metadata.namespace", label_selector="juju-operator=squid"
+ )
+ keys = ["name", "cluster_ip", "type", "ports", "external_ip"]
+ self.assertTrue(k in service for service in services for k in keys)
+
+ @mock.patch("n2vc.kubectl.client.CoreV1Api.list_service_for_all_namespaces")
+ def test_get_service_exception(self, list_services):
+ list_services.side_effect = ApiException()
+ with self.assertRaises(ApiException):
+ self.kubectl.get_services()
git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
juju==2.8.1
paramiko
-pyasn1>=0.4.4
\ No newline at end of file
+pyasn1>=0.4.4
+kubernetes==10.0.1
\ No newline at end of file
'juju==2.8.1',
'paramiko',
'pyasn1>=0.4.4',
+ 'kubernetes==10.0.1'
],
include_package_data=True,
maintainer='Adam Israel',