return resource_status, db_content
async def common_check_list(
- self, op_id, checkings_list, db_collection, db_item, kubectl=None
+ self, op_id, checkings_list, db_collection, db_item, kubectl_obj=None
):
try:
for checking in checkings_list:
condition=checking.get("condition"),
deleted=checking.get("deleted", False),
timeout=checking["timeout"],
- kubectl=kubectl,
+ kubectl_obj=kubectl_obj,
)
if not status:
error_message = "Resources not ready: "
f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
)
result, message = await self.common_check_list(
- op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+ op_id, checkings_list, "ksus", db_ksu, kubectl_obj=cluster_kubectl
)
if not result:
return False, message
f"Checking status of KSU {db_ksu['name']} for profile {profile_id}."
)
result, message = await self.common_check_list(
- op_id, checkings_list, "ksus", db_ksu, kubectl=cluster_kubectl
+ op_id, checkings_list, "ksus", db_ksu, kubectl_obj=cluster_kubectl
)
if not result:
return False, message
self.task_registry[topic][_id][op_id] = {task_name: task}
else:
self.task_registry[topic][_id][op_id][task_name] = task
- self.logger.info("Task resgistry: {}".format(self.task_registry))
+ self.logger.info("Task registry: {}".format(self.task_registry))
# print("registering task", topic, _id, op_id, task_name, task)
def remove(self, topic, _id, op_id, task_name=None):
from distutils.version import LooseVersion
-from kubernetes import client, config
+from kubernetes import client as kclient, config as kconfig
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
class Kubectl:
def __init__(self, config_file=None):
- config.load_kube_config(config_file=config_file)
+ self.logger = logging.getLogger("lcm.kubectl")
+ self._config_file = config_file
+ self.logger.info(f"Kubectl cfg file: {config_file}")
+ # kconfig.load_kube_config(config_file=config_file)
+ self._api_client = kconfig.new_client_from_config(config_file=config_file)
self._clients = {
- CORE_CLIENT: client.CoreV1Api(),
- RBAC_CLIENT: client.RbacAuthorizationV1Api(),
- STORAGE_CLIENT: client.StorageV1Api(),
- CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
+ CORE_CLIENT: kclient.CoreV1Api(api_client=self._api_client),
+ RBAC_CLIENT: kclient.RbacAuthorizationV1Api(api_client=self._api_client),
+ STORAGE_CLIENT: kclient.StorageV1Api(api_client=self._api_client),
+ CUSTOM_OBJECT_CLIENT: kclient.CustomObjectsApi(api_client=self._api_client),
}
- self._configuration = config.kube_config.Configuration.get_default_copy()
- self.logger = logging.getLogger("lcm.kubectl")
+ self._configuration = self._api_client.configuration.get_default_copy()
+ self.logger.info(f"Kubectl cfg file: {config_file}")
+ self.logger.info(f"Kubectl self configuration Host: {self._configuration.host}")
@property
def configuration(self):
self.clients[RBAC_CLIENT].delete_cluster_role(name)
def _get_kubectl_version(self):
- version = VersionApi().get_code()
+ self.logger.debug("Enter _get_kubectl_version function")
+ version = VersionApi(api_client=self._api_client).get_code()
return "{}.{}".format(version.major, version.minor)
def _need_to_create_new_secret(self):
:param: namespace: Kubernetes namespace
Default: kube-system
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
self.clients[CORE_CLIENT].delete_namespaced_secret(name, namespace)
def delete_service_account(self, name: str, namespace: str = "kube-system"):
)
async def get_secret_data(
self, name: str, namespace: str = "kube-system"
- ) -> (str, str):
+ ) -> typing.Tuple[str, str]:
"""
Get secret data
:return: Dictionary with secret's data
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
v1_core = self.clients[CORE_CLIENT]
secret = v1_core.read_namespaced_secret(name, namespace)
:return: None
"""
- self.logger.info("Enter create_secret function")
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
+ self.logger.debug("Enter create_secret function")
v1_core = self.clients[CORE_CLIENT]
- self.logger.info(f"v1_core: {v1_core}")
+ self.logger.debug(f"v1_core: {v1_core}")
metadata = V1ObjectMeta(name=name, namespace=namespace)
- self.logger.info(f"metadata: {metadata}")
+ self.logger.debug(f"metadata: {metadata}")
secret = V1Secret(metadata=metadata, data=data, type=secret_type)
- self.logger.info(f"secret: {secret}")
+ self.logger.debug(f"secret: {secret}")
v1_core.create_namespaced_secret(namespace, secret)
self.logger.info("Namespaced secret was created")
:param: manifest_dict: Dictionary with the content of the Kubernetes manifest
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
if namespace:
:param: name: Name of the object
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
if namespace:
:param: name: Name of the object
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
if namespace:
:param: namespace: Namespace
"""
+ self.logger.debug(f"Kubectl cfg file: {self._config_file}")
+ self.logger.debug(
+ f"Kubectl self configuration Host: {self._configuration.host}"
+ )
client = self.clients[CUSTOM_OBJECT_CLIENT]
try:
if namespace:
pod = V1Pod(
api_version="v1",
kind="Pod",
- metadata=client.V1ObjectMeta(name=name),
+ metadata=kclient.V1ObjectMeta(name=name),
spec=V1PodSpec(
containers=[
V1Container(
class GetServices(TestCase):
- @mock.patch("osm_lcm.n2vc.kubectl.config.load_kube_config")
- @mock.patch("osm_lcm.n2vc.kubectl.client.CoreV1Api")
+ @mock.patch("osm_lcm.n2vc.kubectl.kconfig.new_client_from_config")
+ @mock.patch("osm_lcm.n2vc.kubectl.kclient.CoreV1Api")
def setUp(self, mock_core, mock_config):
mock_core.return_value = mock.MagicMock()
mock_config.return_value = mock.MagicMock()
self.kubectl = Kubectl()
- @mock.patch("osm_lcm.n2vc.kubectl.client.CoreV1Api")
+ @mock.patch("osm_lcm.n2vc.kubectl.kclient.CoreV1Api")
def test_get_service(self, mock_corev1api):
mock_corev1api.return_value = FakeCoreV1Api()
services = self.kubectl.get_services(
self.kubectl.get_services()
-@mock.patch("osm_lcm.n2vc.kubectl.client")
-@mock.patch("osm_lcm.n2vc.kubectl.config.kube_config.Configuration.get_default_copy")
-@mock.patch("osm_lcm.n2vc.kubectl.config.load_kube_config")
+@mock.patch("osm_lcm.n2vc.kubectl.kclient")
+@mock.patch("osm_lcm.n2vc.kubectl.kconfig.new_client_from_config")
class GetConfiguration(KubectlTestCase):
def setUp(self):
super(GetConfiguration, self).setUp()
def test_get_configuration(
self,
- mock_load_kube_config,
- mock_configuration,
- mock_client,
+ mock_new_client_from_config,
+ mock_kclient,
):
kubectl = Kubectl()
kubectl.configuration
- mock_configuration.assert_called_once()
- mock_load_kube_config.assert_called_once()
- mock_client.CoreV1Api.assert_called_once()
- mock_client.RbacAuthorizationV1Api.assert_called_once()
- mock_client.StorageV1Api.assert_called_once()
+ mock_new_client_from_config.assert_called_once()
+ mock_kclient.CoreV1Api.assert_called_once()
+ mock_kclient.RbacAuthorizationV1Api.assert_called_once()
+ mock_kclient.StorageV1Api.assert_called_once()
+ mock_kclient.CustomObjectsApi.assert_called_once()
@mock.patch("kubernetes.client.StorageV1Api.list_storage_class")
-@mock.patch("kubernetes.config.load_kube_config")
+@mock.patch("kubernetes.config.new_client_from_config")
class GetDefaultStorageClass(KubectlTestCase):
def setUp(self):
super(GetDefaultStorageClass, self).setUp()
)
def test_get_default_storage_class_exists_default(
- self, mock_load_kube_config, mock_list_storage_class
+ self,
+ mock_new_client_from_config,
+ mock_list_storage_class,
):
kubectl = Kubectl()
items = [self.default_sc]
mock_list_storage_class.assert_called_once()
def test_get_default_storage_class_exists_default_old(
- self, mock_load_kube_config, mock_list_storage_class
+ self,
+ mock_new_client_from_config,
+ mock_list_storage_class,
):
kubectl = Kubectl()
items = [self.default_sc_old]
mock_list_storage_class.assert_called_once()
def test_get_default_storage_class_none(
- self, mock_load_kube_config, mock_list_storage_class
+ self,
+ mock_new_client_from_config,
+ mock_list_storage_class,
):
kubectl = Kubectl()
mock_list_storage_class.return_value = FakeK8sStorageClassesList(items=[])
mock_list_storage_class.assert_called_once()
def test_get_default_storage_class_exists_not_default(
- self, mock_load_kube_config, mock_list_storage_class
+ self,
+ mock_new_client_from_config,
+ mock_list_storage_class,
):
kubectl = Kubectl()
items = [self.sc]
mock_list_storage_class.assert_called_once()
def test_get_default_storage_class_choose(
- self, mock_load_kube_config, mock_list_storage_class
+ self,
+ mock_new_client_from_config,
+ mock_list_storage_class,
):
kubectl = Kubectl()
items = [self.sc, self.default_sc]
@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_service_account")
@mock.patch("kubernetes.client.CoreV1Api.list_namespaced_service_account")
class CreateServiceAccountClass(KubectlTestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(CreateServiceAccountClass, self).setUp()
self.service_account_name = "Service_account"
self.labels = {"Key1": "Value1", "Key2": "Value2"}
@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
class CreateCertificateClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(
+ self,
+ mock_new_client_from_config,
+ ):
super(CreateCertificateClass, self).setUp()
self.namespace = "osm"
self.name = "test-cert"
@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
class DeleteCertificateClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(DeleteCertificateClass, self).setUp()
self.namespace = "osm"
self.object_name = "test-cert"
@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role")
@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role")
class CreateRoleClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(CreateRoleClass, self).setUp()
self.name = "role"
self.namespace = "osm"
@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding")
@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding")
class CreateRoleBindingClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(CreateRoleBindingClass, self).setUp()
self.name = "rolebinding"
self.namespace = "osm"
@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
class CreateSecretClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(CreateSecretClass, self).setUp()
self.name = "secret"
self.namespace = "osm"
@mock.patch("kubernetes.client.CoreV1Api.create_namespace")
class CreateNamespaceClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(CreateNamespaceClass, self).setUp()
self.namespace = "osm"
self.labels = {"key": "value"}
@mock.patch("kubernetes.client.CoreV1Api.delete_namespace")
class DeleteNamespaceClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(DeleteNamespaceClass, self).setUp()
self.namespace = "osm"
self.kubectl = Kubectl()
@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret")
class GetSecretContentClass(asynctest.TestCase):
- @mock.patch("kubernetes.config.load_kube_config")
- def setUp(self, mock_load_kube_config):
+ @mock.patch("kubernetes.config.new_client_from_config")
+ def setUp(self, mock_new_client_from_config):
super(GetSecretContentClass, self).setUp()
self.name = "my_secret"
self.namespace = "osm"
# self.logger.debug(f"private_key_new_cluster={private_key_new_cluster}")
# Test kubectl connection
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self.logger.debug(self._kubectl._get_kubectl_version())
# Create temporal secret with agekey
secret_key = "agekey"
secret_value = private_key_new_cluster
try:
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
await self.create_secret(
secret_name,
secret_namespace,
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
secret_key = "agekey"
secret_value = private_key_cluster
try:
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
await self.create_secret(
secret_name,
secret_namespace,
self.logger.info(manifest)
# Submit workflow
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
self.logger.info(manifest)
# Submit workflow
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
secret_key = "agekey"
secret_value = private_key_new_cluster
try:
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
await self.create_secret(
secret_name,
secret_namespace,
db_cluster["credentials"], indent=2, default_flow_style=False, sort_keys=False
)
try:
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
await self.create_secret(
secret_name2,
secret_namespace2,
self.logger.debug(f"Workflow manifest: {manifest}")
# Submit workflow
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
self.logger.info(manifest)
# Submit workflow
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(f"Testing kubectl configuration: {self._kubectl.configuration}")
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self._kubectl.create_generic_object(
namespace="osm-workflows",
manifest_dict=yaml.safe_load(manifest),
async def readiness_loop(
- self, op_id, item, name, namespace, condition, deleted, timeout, kubectl=None
+ self, op_id, item, name, namespace, condition, deleted, timeout, kubectl_obj=None
):
- if kubectl is None:
- kubectl = self._kubectl
- self.logger.info("Op {op_id} readiness_loop Enter")
+ if kubectl_obj is None:
+ kubectl_obj = self._kubectl
+ self.logger.info("readiness_loop Enter")
self.logger.info(
f"Op {op_id}. {item} {name}. Namespace: '{namespace}'. Condition: {condition}. Deleted: {deleted}. Timeout: {timeout}"
)
iteration_prefix = f"Op {op_id}. Iteration {counter}/{max_iterations}"
try:
self.logger.info(f"Op {op_id}. Iteration {counter}/{max_iterations}")
- generic_object = await kubectl.get_generic_object(
+ generic_object = await kubectl_obj.get_generic_object(
api_group=api_group,
api_plural=api_plural,
api_version=api_version,
name = secret["name"]
namespace = secret["namespace"]
self.logger.info(f"Deleting secret {name} in namespace {namespace}")
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
self.delete_secret(name, namespace)
# Delete pvcs
for pvc in items.get("pvcs", []):
name = pvc["name"]
namespace = pvc["namespace"]
self.logger.info(f"Deleting pvc {name} in namespace {namespace}")
+ self.logger.debug(f"Testing kubectl: {self._kubectl}")
+ self.logger.debug(
+ f"Testing kubectl configuration: {self._kubectl.configuration}"
+ )
+ self.logger.debug(
+ f"Testing kubectl configuration Host: {self._kubectl.configuration.host}"
+ )
await self._kubectl.delete_pvc(name, namespace)