X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=176d9fc043eed27bcf4ba7e09f1cc485c449fa76;hb=81a934e0b3d778de49ea48a8b0310f0052be821d;hp=b3d7bda3c83f40812ccf03d2a196ea247099dc48;hpb=82b591ceed704c798ead2d9104085a08e75b511b;p=osm%2FN2VC.git diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index b3d7bda..176d9fc 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,16 +21,17 @@ ## import abc import asyncio +from typing import Union import random import time import shlex import shutil import stat -import subprocess import os import yaml from uuid import uuid4 +from n2vc.config import EnvironConfig from n2vc.exceptions import K8sException from n2vc.k8s_conn import K8sConnector @@ -44,7 +45,6 @@ class K8sHelmBaseConnector(K8sConnector): """ service_account = "osm" - _STABLE_REPO_URL = "https://charts.helm.sh/stable" def __init__( self, @@ -54,7 +54,6 @@ class K8sHelmBaseConnector(K8sConnector): helm_command: str = "/usr/bin/helm", log: object = None, on_update_db=None, - vca_config: dict = None, ): """ @@ -71,6 +70,7 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Initializing K8S Helm connector") + self.config = EnvironConfig() # random numbers for release name generation random.seed(time.time()) @@ -86,19 +86,22 @@ class K8sHelmBaseConnector(K8sConnector): self._check_file_exists(filename=helm_command, exception_if_not_exists=True) # obtain stable repo url from config or apply default - if not vca_config or not vca_config.get("stablerepourl"): - self._stable_repo_url = self._STABLE_REPO_URL - else: - self._stable_repo_url = vca_config.get("stablerepourl") + self._stable_repo_url = self.config.get("stablerepourl") + if self._stable_repo_url == "None": + self._stable_repo_url = None - @staticmethod - def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str): + def _get_namespace(self, cluster_uuid: str) -> str: """ - Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only - cluster_id for backward compatibility + Obtains the namespace used by the cluster with the uuid passed by argument + + param: cluster_uuid: cluster's uuid """ - namespace, _, cluster_id = cluster_uuid.rpartition(":") - return namespace, cluster_id + + # first, obtain the cluster corresponding to the uuid passed by argument + k8scluster = self.db.get_one( + "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False + ) + return k8scluster.get("namespace") async def init_env( self, @@ -122,11 +125,9 @@ class K8sHelmBaseConnector(K8sConnector): """ if reuse_cluster_uuid: - namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid) - namespace = namespace_ or namespace + cluster_id = reuse_cluster_uuid else: cluster_id = str(uuid4()) - cluster_uuid = "{}:{}".format(namespace, cluster_id) self.log.debug( "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace) @@ -148,42 +149,70 @@ class K8sHelmBaseConnector(K8sConnector): self.log.info("Cluster {} initialized".format(cluster_id)) - return cluster_uuid, n2vc_installed_sw + return cluster_id, n2vc_installed_sw async def repo_add( self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart" ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "Cluster {}, adding {} repository {}. URL: {}".format( - cluster_id, repo_type, name, url + cluster_uuid, repo_type, name, url ) ) - # sync local dir - self.fs.sync(from_path=cluster_id) - # init_env paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # helm repo add name url + command = "env KUBECONFIG={} {} repo add {} {}".format( + paths["kube_config"], self._helm_command, name, url + ) + self.log.debug("adding repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env ) # helm repo update - command = "{} repo update".format(self._helm_command) + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name + ) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - # helm repo add name url - command = "{} repo add {} {}".format(self._helm_command, name, url) - self.log.debug("adding repo: {}".format(command)) + # sync fs + self.fs.reverse_sync(from_path=cluster_uuid) + + async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format( + cluster_uuid, repo_type, name + ) + ) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) + self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env + command=command, raise_exception_on_error=False, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def repo_list(self, cluster_uuid: str) -> list: """ @@ -192,18 +221,19 @@ class K8sHelmBaseConnector(K8sConnector): :return: list of registered repositories: [ (name, url) .... ] """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list repositories for cluster {}".format(cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) + self.log.debug("list repositories for cluster {}".format(cluster_uuid)) # config filename paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = "{} repo list --output yaml".format(self._helm_command) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = "env KUBECONFIG={} {} repo list --output yaml".format( + paths["kube_config"], self._helm_command + ) # Set exception to false because if there are no repos just want an empty list output, _rc = await self._local_async_exec( @@ -211,7 +241,7 @@ class K8sHelmBaseConnector(K8sConnector): ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) if _rc == 0: if output and len(output) > 0: @@ -224,25 +254,27 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) - - # sync local dir - self.fs.sync(from_path=cluster_id) + self.log.debug( + "remove {} repositories for cluster {}".format(name, cluster_uuid) + ) # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = "{} repo remove {}".format(self._helm_command, name) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = "env KUBECONFIG={} {} repo remove {}".format( + paths["kube_config"], self._helm_command, name + ) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) async def reset( self, @@ -261,15 +293,15 @@ class K8sHelmBaseConnector(K8sConnector): :param kwargs: Additional parameters (None yet) :return: Returns True if successful or raises an exception. """ - namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid) + namespace = self._get_namespace(cluster_uuid=cluster_uuid) self.log.debug( "Resetting K8s environment. cluster uuid: {} uninstall={}".format( - cluster_id, uninstall_sw + cluster_uuid, uninstall_sw ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # uninstall releases if needed. if uninstall_sw: @@ -298,20 +330,20 @@ class K8sHelmBaseConnector(K8sConnector): else: msg = ( "Cluster uuid: {} has releases and not force. Leaving K8s helm environment" - ).format(cluster_id) + ).format(cluster_uuid) self.log.warn(msg) uninstall_sw = ( False # Allow to remove k8s cluster without removing Tiller ) if uninstall_sw: - await self._uninstall_sw(cluster_id, namespace) + await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace) # delete cluster directory - self.log.debug("Removing directory {}".format(cluster_id)) - self.fs.file_delete(cluster_id, ignore_non_exist=True) + self.log.debug("Removing directory {}".format(cluster_uuid)) + self.fs.file_delete(cluster_uuid, ignore_non_exist=True) # Remove also local directorio if still exist - direct = self.fs.path + "/" + cluster_id + direct = self.fs.path + "/" + cluster_uuid shutil.rmtree(direct, ignore_errors=True) return True @@ -330,6 +362,11 @@ class K8sHelmBaseConnector(K8sConnector): kdu_name: str = None, namespace: str = None, ): + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + # params to str params_str, file_to_delete = self._params_to_file_option( cluster_id=cluster_id, params=params @@ -343,8 +380,19 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + self.repo_update(cluster_id, repo) + command = self._get_install_command( - kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + paths["kube_config"], ) self.log.debug("installing: {}".format(command)) @@ -413,11 +461,10 @@ class K8sHelmBaseConnector(K8sConnector): params: dict = None, db_dict: dict = None, ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id)) + self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -426,12 +473,15 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + # params to str params_str, file_to_delete = self._params_to_file_option( - cluster_id=cluster_id, params=params + cluster_id=cluster_uuid, params=params ) # version @@ -442,6 +492,10 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + self.repo_update(cluster_uuid, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, @@ -450,6 +504,7 @@ class K8sHelmBaseConnector(K8sConnector): version, atomic, timeout, + paths["kube_config"], ) self.log.debug("upgrading: {}".format(command)) @@ -465,7 +520,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -493,7 +548,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -508,7 +563,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -542,16 +597,14 @@ class K8sHelmBaseConnector(K8sConnector): async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( - kdu_instance, revision, cluster_id + kdu_instance, revision, cluster_uuid ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) @@ -560,11 +613,14 @@ class K8sHelmBaseConnector(K8sConnector): # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + command = self._get_rollback_command( - kdu_instance, instance_info["namespace"], revision + kdu_instance, instance_info["namespace"], revision, paths["kube_config"] ) self.log.debug("rolling_back: {}".format(command)) @@ -578,7 +634,7 @@ class K8sHelmBaseConnector(K8sConnector): # write status in another task status_task = asyncio.ensure_future( coro_or_future=self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -597,7 +653,7 @@ class K8sHelmBaseConnector(K8sConnector): # write final status await self._store_status( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance_info["namespace"], db_dict=db_dict, @@ -612,7 +668,7 @@ class K8sHelmBaseConnector(K8sConnector): raise K8sException(msg) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) # return new revision number instance = await self.get_instance_info( @@ -637,31 +693,37 @@ class K8sHelmBaseConnector(K8sConnector): :return: True if successful """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( - "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id) + "uninstall kdu_instance {} from cluster {}".format( + kdu_instance, cluster_uuid + ) ) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) - + self.log.warning(("kdu_instance {} not found".format(kdu_instance))) + return True # init env, paths paths, env = self._init_paths_env( - cluster_name=cluster_id, create_if_not_exist=True + cluster_name=cluster_uuid, create_if_not_exist=True ) - command = self._get_uninstall_command(kdu_instance, instance_info["namespace"]) + # sync local dir + self.fs.sync(from_path=cluster_uuid) + + command = self._get_uninstall_command( + kdu_instance, instance_info["namespace"], paths["kube_config"] + ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return self._output_to_table(output) @@ -673,17 +735,16 @@ class K8sHelmBaseConnector(K8sConnector): :return: """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - self.log.debug("list releases for cluster {}".format(cluster_id)) + self.log.debug("list releases for cluster {}".format(cluster_uuid)) # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # execute internal command - result = await self._instances_list(cluster_id) + result = await self._instances_list(cluster_uuid) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return result @@ -741,26 +802,32 @@ class K8sHelmBaseConnector(K8sConnector): - `external_ip` List of external ips (in case they are available) """ - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "get_services: cluster_uuid: {}, kdu_instance: {}".format( cluster_uuid, kdu_instance ) ) + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_uuid, create_if_not_exist=True + ) + # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get list of services names for kdu - service_names = await self._get_services(cluster_id, kdu_instance, namespace) + service_names = await self._get_services( + cluster_uuid, kdu_instance, namespace, paths["kube_config"] + ) service_list = [] for service in service_names: - service = await self._get_service(cluster_id, service, namespace) + service = await self._get_service(cluster_uuid, service, namespace) service_list.append(service) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service_list @@ -774,19 +841,19 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) - service = await self._get_service(cluster_id, service_name, namespace) + service = await self._get_service(cluster_uuid, service_name, namespace) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -796,6 +863,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -816,13 +885,11 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) - # sync local dir - self.fs.sync(from_path=cluster_id) + self.fs.sync(from_path=cluster_uuid) # get instance: needed to obtain namespace - instances = await self._instances_list(cluster_id=cluster_id) + instances = await self._instances_list(cluster_id=cluster_uuid) for instance in instances: if instance.get("name") == kdu_instance: break @@ -830,20 +897,20 @@ class K8sHelmBaseConnector(K8sConnector): # instance does not exist raise K8sException( "Instance name: {} not found in cluster: {}".format( - kdu_instance, cluster_id + kdu_instance, cluster_uuid ) ) status = await self._status_kdu( - cluster_id=cluster_id, + cluster_id=cluster_uuid, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs - self.fs.reverse_sync(from_path=cluster_id) + self.fs.reverse_sync(from_path=cluster_uuid) return status @@ -972,7 +1039,7 @@ class K8sHelmBaseConnector(K8sConnector): """ @abc.abstractmethod - async def _get_services(self, cluster_id, kdu_instance, namespace): + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): """ Implements the helm version dependent method to obtain services from a helm instance """ @@ -983,16 +1050,24 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @abc.abstractmethod def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: """ Obtain command to be executed to delete the indicated instance @@ -1000,20 +1075,32 @@ class K8sHelmBaseConnector(K8sConnector): @abc.abstractmethod def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: """ Obtain command to be executed to upgrade the indicated instance """ @abc.abstractmethod - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: """ Obtain command to be executed to rollback the indicated instance """ @abc.abstractmethod - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: """ Obtain command to be executed to delete the indicated instance """ @@ -1137,27 +1224,12 @@ class K8sHelmBaseConnector(K8sConnector): of dictionaries """ new_list = [] - for dictionary in input_list: - new_dict = dict((k.lower(), v) for k, v in dictionary.items()) - new_list.append(new_dict) + if input_list: + for dictionary in input_list: + new_dict = dict((k.lower(), v) for k, v in dictionary.items()) + new_list.append(new_dict) return new_list - def _local_exec(self, command: str) -> (str, int): - command = self._remove_multiple_spaces(command) - self.log.debug("Executing sync local command: {}".format(command)) - # raise exception if fails - output = "" - try: - output = subprocess.check_output( - command, shell=True, universal_newlines=True - ) - return_code = 0 - self.log.debug(output) - except Exception: - return_code = 1 - - return output, return_code - async def _local_async_exec( self, command: str, @@ -1392,8 +1464,8 @@ class K8sHelmBaseConnector(K8sConnector): detailed_status = await self._status_kdu( cluster_id=cluster_id, kdu_instance=kdu_instance, + yaml_format=False, namespace=namespace, - return_text=False, ) status = detailed_status.get("info").get("description") self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) @@ -1500,3 +1572,10 @@ class K8sHelmBaseConnector(K8sConnector): name = name + get_random_number() return name.lower() + + async def _split_repo(self, kdu_model: str) -> str: + repo_name = None + idx = kdu_model.find("/") + if idx >= 0: + repo_name = kdu_model[:idx] + return repo_name