Use kubectl module from n2vc and remove it from odu_libs 28/14528/1 master v16.0 release-v16.0-start v16.0.0
authorgarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 22 Aug 2024 08:05:47 +0000 (10:05 +0200)
committergarciadeblas <gerardo.garciadeblas@telefonica.com>
Thu, 22 Aug 2024 08:05:47 +0000 (10:05 +0200)
Change-Id: I08a715564f32623dda1608937c7d35bb212a4961
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
osm_lcm/odu_libs/kubectl.py [deleted file]
osm_lcm/odu_workflows.py

diff --git a/osm_lcm/odu_libs/kubectl.py b/osm_lcm/odu_libs/kubectl.py
deleted file mode 100644 (file)
index a7f0cee..0000000
+++ /dev/null
@@ -1,829 +0,0 @@
-#######################################################################################
-# Copyright 2020 Canonical Ltd.
-# Copyright ETSI Contributors and Others.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-#     Unless required by applicable law or agreed to in writing, software
-#     distributed under the License is distributed on an "AS IS" BASIS,
-#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#     See the License for the specific language governing permissions and
-#     limitations under the License.
-#######################################################################################
-
-import base64
-import logging
-from typing import Dict
-import typing
-import uuid
-import json
-
-from distutils.version import LooseVersion
-
-from kubernetes import client, config
-from kubernetes.client.api import VersionApi
-from kubernetes.client.models import (
-    V1ClusterRole,
-    V1Role,
-    V1ObjectMeta,
-    V1PolicyRule,
-    V1ServiceAccount,
-    V1ClusterRoleBinding,
-    V1RoleBinding,
-    V1RoleRef,
-    RbacV1Subject,
-    V1Secret,
-    V1SecretReference,
-    V1Namespace,
-)
-from kubernetes.client.rest import ApiException
-from n2vc.libjuju import retry_callback
-from retrying_async import retry
-
-
-SERVICE_ACCOUNT_TOKEN_KEY = "token"
-SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
-# clients
-CORE_CLIENT = "core_v1"
-RBAC_CLIENT = "rbac_v1"
-STORAGE_CLIENT = "storage_v1"
-CUSTOM_OBJECT_CLIENT = "custom_object"
-
-
-class Kubectl:
-    def __init__(self, config_file=None):
-        config.load_kube_config(config_file=config_file)
-        self._clients = {
-            CORE_CLIENT: client.CoreV1Api(),
-            RBAC_CLIENT: client.RbacAuthorizationV1Api(),
-            STORAGE_CLIENT: client.StorageV1Api(),
-            CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
-        }
-        self._configuration = config.kube_config.Configuration.get_default_copy()
-        self.logger = logging.getLogger("lcm.odu")
-
-    @property
-    def configuration(self):
-        return self._configuration
-
-    @property
-    def clients(self):
-        return self._clients
-
-    def get_services(
-        self,
-        field_selector: str = None,
-        label_selector: str = None,
-    ) -> typing.List[typing.Dict]:
-        """
-        Get Service list from a namespace
-
-        :param: field_selector:     Kubernetes field selector for the namespace
-        :param: label_selector:     Kubernetes label selector for the namespace
-
-        :return: List of the services matching the selectors specified
-        """
-        kwargs = {}
-        if field_selector:
-            kwargs["field_selector"] = field_selector
-        if label_selector:
-            kwargs["label_selector"] = label_selector
-        try:
-            result = self.clients[CORE_CLIENT].list_service_for_all_namespaces(**kwargs)
-            return [
-                {
-                    "name": i.metadata.name,
-                    "cluster_ip": i.spec.cluster_ip,
-                    "type": i.spec.type,
-                    "ports": (
-                        [
-                            {
-                                "name": p.name,
-                                "node_port": p.node_port,
-                                "port": p.port,
-                                "protocol": p.protocol,
-                                "target_port": p.target_port,
-                            }
-                            for p in i.spec.ports
-                        ]
-                        if i.spec.ports
-                        else []
-                    ),
-                    "external_ip": [i.ip for i in i.status.load_balancer.ingress]
-                    if i.status.load_balancer.ingress
-                    else None,
-                }
-                for i in result.items
-            ]
-        except ApiException as e:
-            self.logger.error("Error calling get services: {}".format(e))
-            raise e
-
-    def get_default_storage_class(self) -> str:
-        """
-        Default storage class
-
-        :return:    Returns the default storage class name, if exists.
-                    If not, it returns the first storage class.
-                    If there are not storage classes, returns None
-        """
-        storage_classes = self.clients[STORAGE_CLIENT].list_storage_class()
-        selected_sc = None
-        default_sc_annotations = {
-            "storageclass.kubernetes.io/is-default-class": "true",
-            # Older clusters still use the beta annotation.
-            "storageclass.beta.kubernetes.io/is-default-class": "true",
-        }
-        for sc in storage_classes.items:
-            if not selected_sc:
-                # Select the first storage class in case there is no a default-class
-                selected_sc = sc.metadata.name
-            annotations = sc.metadata.annotations or {}
-            if any(
-                k in annotations and annotations[k] == v
-                for k, v in default_sc_annotations.items()
-            ):
-                # Default storage
-                selected_sc = sc.metadata.name
-                break
-        return selected_sc
-
-    def create_cluster_role(
-        self,
-        name: str,
-        labels: Dict[str, str],
-        namespace: str = "kube-system",
-    ):
-        """
-        Create a cluster role
-
-        :param: name:       Name of the cluster role
-        :param: labels:     Labels for cluster role metadata
-        :param: namespace:  Kubernetes namespace for cluster role metadata
-                            Default: kube-system
-        """
-        cluster_roles = self.clients[RBAC_CLIENT].list_cluster_role(
-            field_selector="metadata.name={}".format(name)
-        )
-
-        if len(cluster_roles.items) > 0:
-            raise Exception("Role with metadata.name={} already exists".format(name))
-
-        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
-        # Cluster role
-        cluster_role = V1ClusterRole(
-            metadata=metadata,
-            rules=[
-                V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
-                V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
-            ],
-        )
-
-        self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
-    async def create_role(
-        self,
-        name: str,
-        labels: Dict[str, str],
-        api_groups: list,
-        resources: list,
-        verbs: list,
-        namespace: str,
-    ):
-        """
-        Create a role with one PolicyRule
-
-        :param: name:       Name of the namespaced Role
-        :param: labels:     Labels for namespaced Role metadata
-        :param: api_groups: List with api-groups allowed in the policy rule
-        :param: resources:  List with resources allowed in the policy rule
-        :param: verbs:      List with verbs allowed in the policy rule
-        :param: namespace:  Kubernetes namespace for Role metadata
-
-        :return: None
-        """
-
-        roles = self.clients[RBAC_CLIENT].list_namespaced_role(
-            namespace, field_selector="metadata.name={}".format(name)
-        )
-
-        if len(roles.items) > 0:
-            raise Exception("Role with metadata.name={} already exists".format(name))
-
-        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
-
-        role = V1Role(
-            metadata=metadata,
-            rules=[
-                V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
-            ],
-        )
-
-        self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
-
-    def delete_cluster_role(self, name: str):
-        """
-        Delete a cluster role
-
-        :param: name:       Name of the cluster role
-        """
-        self.clients[RBAC_CLIENT].delete_cluster_role(name)
-
-    def _get_kubectl_version(self):
-        version = VersionApi().get_code()
-        return "{}.{}".format(version.major, version.minor)
-
-    def _need_to_create_new_secret(self):
-        min_k8s_version = "1.24"
-        current_k8s_version = self._get_kubectl_version()
-        return LooseVersion(min_k8s_version) <= LooseVersion(current_k8s_version)
-
-    def _get_secret_name(self, service_account_name: str):
-        random_alphanum = str(uuid.uuid4())[:5]
-        return "{}-token-{}".format(service_account_name, random_alphanum)
-
-    def _create_service_account_secret(
-        self,
-        service_account_name: str,
-        namespace: str,
-        secret_name: str,
-    ):
-        """
-        Create a secret for the service account. K8s version >= 1.24
-
-        :param: service_account_name: Name of the service account
-        :param: namespace:  Kubernetes namespace for service account metadata
-        :param: secret_name: Name of the secret
-        """
-        v1_core = self.clients[CORE_CLIENT]
-        secrets = v1_core.list_namespaced_secret(
-            namespace, field_selector="metadata.name={}".format(secret_name)
-        ).items
-
-        if len(secrets) > 0:
-            raise Exception(
-                "Secret with metadata.name={} already exists".format(secret_name)
-            )
-
-        annotations = {"kubernetes.io/service-account.name": service_account_name}
-        metadata = V1ObjectMeta(
-            name=secret_name, namespace=namespace, annotations=annotations
-        )
-        type = "kubernetes.io/service-account-token"
-        secret = V1Secret(metadata=metadata, type=type)
-        v1_core.create_namespaced_secret(namespace, secret)
-
-    def _get_secret_reference_list(self, namespace: str, secret_name: str):
-        """
-        Return a secret reference list with one secret.
-        K8s version >= 1.24
-
-        :param: namespace:  Kubernetes namespace for service account metadata
-        :param: secret_name: Name of the secret
-        :rtype: list[V1SecretReference]
-        """
-        return [V1SecretReference(name=secret_name, namespace=namespace)]
-
-    def create_service_account(
-        self,
-        name: str,
-        labels: Dict[str, str],
-        namespace: str = "kube-system",
-    ):
-        """
-        Create a service account
-
-        :param: name:       Name of the service account
-        :param: labels:     Labels for service account metadata
-        :param: namespace:  Kubernetes namespace for service account metadata
-                            Default: kube-system
-        """
-        v1_core = self.clients[CORE_CLIENT]
-        service_accounts = v1_core.list_namespaced_service_account(
-            namespace, field_selector="metadata.name={}".format(name)
-        )
-        if len(service_accounts.items) > 0:
-            raise Exception(
-                "Service account with metadata.name={} already exists".format(name)
-            )
-
-        metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
-
-        if self._need_to_create_new_secret():
-            secret_name = self._get_secret_name(name)
-            secrets = self._get_secret_reference_list(namespace, secret_name)
-            service_account = V1ServiceAccount(metadata=metadata, secrets=secrets)
-            v1_core.create_namespaced_service_account(namespace, service_account)
-            self._create_service_account_secret(name, namespace, secret_name)
-        else:
-            service_account = V1ServiceAccount(metadata=metadata)
-            v1_core.create_namespaced_service_account(namespace, service_account)
-
-    def delete_secret(self, name: str, namespace: str = "kube-system"):
-        """
-        Delete a secret
-
-        :param: name:       Name of the secret
-        :param: namespace:  Kubernetes namespace
-                            Default: kube-system
-        """
-        self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
-
-    def delete_service_account(self, name: str, namespace: str = "kube-system"):
-        """
-        Delete a service account
-
-        :param: name:       Name of the service account
-        :param: namespace:  Kubernetes namespace for service account metadata
-                            Default: kube-system
-        """
-        self.clients[CORE_CLIENT].delete_namespaced_service_account(name, namespace)
-
-    def create_cluster_role_binding(
-        self, name: str, labels: Dict[str, str], namespace: str = "kube-system"
-    ):
-        """
-        Create a cluster role binding
-
-        :param: name:       Name of the cluster role
-        :param: labels:     Labels for cluster role binding metadata
-        :param: namespace:  Kubernetes namespace for cluster role binding metadata
-                            Default: kube-system
-        """
-        role_bindings = self.clients[RBAC_CLIENT].list_cluster_role_binding(
-            field_selector="metadata.name={}".format(name)
-        )
-        if len(role_bindings.items) > 0:
-            raise Exception("Generated rbac id already exists")
-
-        role_binding = V1ClusterRoleBinding(
-            metadata=V1ObjectMeta(name=name, labels=labels),
-            role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
-            subjects=[
-                RbacV1Subject(kind="ServiceAccount", name=name, namespace=namespace)
-            ],
-        )
-        self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
-    async def create_role_binding(
-        self,
-        name: str,
-        role_name: str,
-        sa_name: str,
-        labels: Dict[str, str],
-        namespace: str,
-    ):
-        """
-        Create a cluster role binding
-
-        :param: name:       Name of the namespaced Role Binding
-        :param: role_name:  Name of the namespaced Role to be bound
-        :param: sa_name:    Name of the Service Account to be bound
-        :param: labels:     Labels for Role Binding metadata
-        :param: namespace:  Kubernetes namespace for Role Binding metadata
-
-        :return: None
-        """
-        role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
-            namespace, field_selector="metadata.name={}".format(name)
-        )
-        if len(role_bindings.items) > 0:
-            raise Exception(
-                "Role Binding with metadata.name={} already exists".format(name)
-            )
-
-        role_binding = V1RoleBinding(
-            metadata=V1ObjectMeta(name=name, labels=labels),
-            role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
-            subjects=[
-                RbacV1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
-            ],
-        )
-        self.clients[RBAC_CLIENT].create_namespaced_role_binding(
-            namespace, role_binding
-        )
-
-    def delete_cluster_role_binding(self, name: str):
-        """
-        Delete a cluster role binding
-
-        :param: name:       Name of the cluster role binding
-        """
-        self.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed getting the secret from service account"),
-        callback=retry_callback,
-    )
-    async def get_secret_data(
-        self, name: str, namespace: str = "kube-system"
-    ) -> (str, str):
-        """
-        Get secret data
-
-        :param: name:       Name of the secret data
-        :param: namespace:  Name of the namespace where the secret is stored
-
-        :return: Tuple with the token and client certificate
-        """
-        v1_core = self.clients[CORE_CLIENT]
-
-        secret_name = None
-
-        service_accounts = v1_core.list_namespaced_service_account(
-            namespace, field_selector="metadata.name={}".format(name)
-        )
-        if len(service_accounts.items) == 0:
-            raise Exception(
-                "Service account not found with metadata.name={}".format(name)
-            )
-        service_account = service_accounts.items[0]
-        if service_account.secrets and len(service_account.secrets) > 0:
-            secret_name = service_account.secrets[0].name
-        if not secret_name:
-            raise Exception(
-                "Failed getting the secret from service account {}".format(name)
-            )
-        # TODO: refactor to use get_secret_content
-        secret = v1_core.list_namespaced_secret(
-            namespace, field_selector="metadata.name={}".format(secret_name)
-        ).items[0]
-
-        token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
-        client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
-        return (
-            base64.b64decode(token).decode("utf-8"),
-            base64.b64decode(client_certificate_data).decode("utf-8"),
-        )
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed getting data from the secret"),
-    )
-    async def get_secret_content(
-        self,
-        name: str,
-        namespace: str,
-    ) -> dict:
-        """
-        Get secret data
-
-        :param: name:       Name of the secret
-        :param: namespace:  Name of the namespace where the secret is stored
-
-        :return: Dictionary with secret's data
-        """
-        v1_core = self.clients[CORE_CLIENT]
-
-        secret = v1_core.read_namespaced_secret(name, namespace)
-
-        return secret.data
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed creating the secret"),
-    )
-    async def create_secret(
-        self, name: str, data: dict, namespace: str, secret_type: str
-    ):
-        """
-        Create secret with data
-
-        :param: name:        Name of the secret
-        :param: data:        Dict with data content. Values must be already base64 encoded
-        :param: namespace:   Name of the namespace where the secret will be stored
-        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
-
-        :return: None
-        """
-        self.logger.info("Enter create_secret function")
-        v1_core = self.clients[CORE_CLIENT]
-        self.logger.info(f"v1_core: {v1_core}")
-        metadata = V1ObjectMeta(name=name, namespace=namespace)
-        self.logger.info(f"metadata: {metadata}")
-        secret = V1Secret(metadata=metadata, data=data, type=secret_type)
-        self.logger.info(f"secret: {secret}")
-        v1_core.create_namespaced_secret(namespace, secret)
-        self.logger.info("Namespaced secret was created")
-
-    async def create_certificate(
-        self,
-        namespace: str,
-        name: str,
-        dns_prefix: str,
-        secret_name: str,
-        usages: list,
-        issuer_name: str,
-    ):
-        """
-        Creates cert-manager certificate object
-
-        :param: namespace:       Name of the namespace where the certificate and secret is stored
-        :param: name:            Name of the certificate object
-        :param: dns_prefix:      Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
-        :param: secret_name:     Name of the secret created by cert-manager
-        :param: usages:          List of X.509 key usages
-        :param: issuer_name:     Name of the cert-manager's Issuer or ClusterIssuer object
-
-        """
-        certificate_body = {
-            "apiVersion": "cert-manager.io/v1",
-            "kind": "Certificate",
-            "metadata": {"name": name, "namespace": namespace},
-            "spec": {
-                "secretName": secret_name,
-                "privateKey": {
-                    "rotationPolicy": "Always",
-                    "algorithm": "ECDSA",
-                    "size": 256,
-                },
-                "duration": "8760h",  # 1 Year
-                "renewBefore": "2208h",  # 9 months
-                "subject": {"organizations": ["osm"]},
-                "commonName": "osm",
-                "isCA": False,
-                "usages": usages,
-                "dnsNames": [
-                    "{}.{}".format(dns_prefix, namespace),
-                    "{}.{}.svc".format(dns_prefix, namespace),
-                    "{}.{}.svc.cluster".format(dns_prefix, namespace),
-                    "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
-                ],
-                "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
-            },
-        }
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            client.create_namespaced_custom_object(
-                group="cert-manager.io",
-                plural="certificates",
-                version="v1",
-                body=certificate_body,
-                namespace=namespace,
-            )
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "alreadyexists":
-                self.logger.warning("Certificate already exists: {}".format(e))
-            else:
-                raise e
-
-    async def delete_certificate(self, namespace, object_name):
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            client.delete_namespaced_custom_object(
-                group="cert-manager.io",
-                plural="certificates",
-                version="v1",
-                name=object_name,
-                namespace=namespace,
-            )
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "notfound":
-                self.logger.warning("Certificate already deleted: {}".format(e))
-            else:
-                raise e
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed creating the namespace"),
-    )
-    async def create_namespace(self, name: str, labels: dict = None):
-        """
-        Create a namespace
-
-        :param: name:       Name of the namespace to be created
-        :param: labels:     Dictionary with labels for the new namespace
-
-        """
-        v1_core = self.clients[CORE_CLIENT]
-        metadata = V1ObjectMeta(name=name, labels=labels)
-        namespace = V1Namespace(
-            metadata=metadata,
-        )
-
-        try:
-            v1_core.create_namespace(namespace)
-            self.logger.debug("Namespace created: {}".format(name))
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "alreadyexists":
-                self.logger.warning("Namespace already exists: {}".format(e))
-            else:
-                raise e
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed deleting the namespace"),
-    )
-    async def delete_namespace(self, name: str):
-        """
-        Delete a namespace
-
-        :param: name:       Name of the namespace to be deleted
-
-        """
-        try:
-            self.clients[CORE_CLIENT].delete_namespace(name)
-        except ApiException as e:
-            if e.reason == "Not Found":
-                self.logger.warning("Namespace already deleted: {}".format(e))
-
-    def get_secrets(
-        self,
-        namespace: str,
-        field_selector: str = None,
-    ) -> typing.List[typing.Dict]:
-        """
-        Get Secret list from a namespace
-
-        :param: namespace:  Kubernetes namespace
-        :param: field_selector:     Kubernetes field selector
-
-        :return: List of the secrets matching the selectors specified
-        """
-        try:
-            v1_core = self.clients[CORE_CLIENT]
-            secrets = v1_core.list_namespaced_secret(
-                namespace=namespace,
-                field_selector=field_selector,
-            ).items
-            return secrets
-        except ApiException as e:
-            self.logger.error("Error calling get secrets: {}".format(e))
-            raise e
-
-    def create_generic_object(
-        self,
-        api_group: str,
-        api_plural: str,
-        api_version: str,
-        namespace: str,
-        manifest_dict: dict,
-    ):
-        """
-        Creates generic object
-
-        :param: api_group:       API Group
-        :param: api_plural:      API Plural
-        :param: api_version:     API Version
-        :param: namespace:       Namespace
-        :param: manifest_dict:   Dictionary with the content of the Kubernetes manifest
-
-        """
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            client.create_namespaced_custom_object(
-                group=api_group,
-                plural=api_plural,
-                version=api_version,
-                body=manifest_dict,
-                namespace=namespace,
-            )
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "alreadyexists":
-                self.logger.warning("Object already exists: {}".format(e))
-            else:
-                raise e
-
-    def delete_generic_object(
-        self,
-        api_group: str,
-        api_plural: str,
-        api_version: str,
-        namespace: str,
-        name: str,
-    ):
-        """
-        Deletes generic object
-
-        :param: api_group:       API Group
-        :param: api_plural:      API Plural
-        :param: api_version:     API Version
-        :param: namespace:       Namespace
-        :param: name:            Name of the object
-
-        """
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            client.delete_namespaced_custom_object(
-                group=api_group,
-                plural=api_plural,
-                version=api_version,
-                name=name,
-                namespace=namespace,
-            )
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "notfound":
-                self.logger.warning("Object already deleted: {}".format(e))
-            else:
-                raise e
-
-    async def get_generic_object(
-        self,
-        api_group: str,
-        api_plural: str,
-        api_version: str,
-        namespace: str,
-        name: str,
-    ):
-        """
-        Gets generic object
-
-        :param: api_group:       API Group
-        :param: api_plural:      API Plural
-        :param: api_version:     API Version
-        :param: namespace:       Namespace
-        :param: name:            Name of the object
-
-        """
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            object_dict = client.list_namespaced_custom_object(
-                group=api_group,
-                plural=api_plural,
-                version=api_version,
-                namespace=namespace,
-                field_selector=f"metadata.name={name}",
-            )
-            if len(object_dict.get("items")) == 0:
-                return None
-            return object_dict.get("items")[0]
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "notfound":
-                self.logger.warning("Cannot get custom object: {}".format(e))
-            else:
-                raise e
-
-    async def list_generic_object(
-        self,
-        api_group: str,
-        api_plural: str,
-        api_version: str,
-        namespace: str,
-    ):
-        """
-        Lists all generic objects of the requested API group
-
-        :param: api_group:       API Group
-        :param: api_plural:      API Plural
-        :param: api_version:     API Version
-        :param: namespace:       Namespace
-
-        """
-        client = self.clients[CUSTOM_OBJECT_CLIENT]
-        try:
-            object_dict = client.list_namespaced_custom_object(
-                group=api_group,
-                plural=api_plural,
-                version=api_version,
-                namespace=namespace,
-            )
-            self.logger.debug(f"Object-list: {object_dict.get('items')}")
-            return object_dict.get("items")
-        except ApiException as e:
-            info = json.loads(e.body)
-            if info.get("reason").lower() == "notfound":
-                self.logger.warning(
-                    "Cannot retrieve list of custom objects: {}".format(e)
-                )
-            else:
-                raise e
-
-    @retry(
-        attempts=10,
-        delay=1,
-        fallback=Exception("Failed creating the secret"),
-    )
-    async def create_secret_string(
-        self, name: str, string_data: str, namespace: str, secret_type: str
-    ):
-        """
-        Create secret with data
-
-        :param: name:        Name of the secret
-        :param: string_data: String with data content
-        :param: namespace:   Name of the namespace where the secret will be stored
-        :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
-
-        :return: None
-        """
-        v1_core = self.clients[CORE_CLIENT]
-        metadata = V1ObjectMeta(name=name, namespace=namespace)
-        secret = V1Secret(metadata=metadata, string_data=string_data, type=secret_type)
-        v1_core.create_namespaced_secret(namespace, secret)
index e46cdf7..bb3fce2 100644 (file)
@@ -17,8 +17,7 @@
 import logging
 from osm_lcm.lcm_utils import LcmBase
 
-# from n2vc import kubectl
-from osm_lcm.odu_libs import kubectl
+from n2vc import kubectl
 
 
 class OduWorkflow(LcmBase):