X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fk8s_helm_base_conn.py;h=149b064ba5ed57fb5fbccac5ae7705eba074d07f;hp=ad59e8b1232d791b4937b980faecf9d99dc0b252;hb=60a3a96717d7c36ba7a65573da59a6bc039f5e28;hpb=f4c2ac82b78811a16582cc8eb90c6fe69dadd3f0 diff --git a/n2vc/k8s_helm_base_conn.py b/n2vc/k8s_helm_base_conn.py index ad59e8b..149b064 100644 --- a/n2vc/k8s_helm_base_conn.py +++ b/n2vc/k8s_helm_base_conn.py @@ -21,12 +21,12 @@ ## import abc import asyncio +from typing import Union import random import time import shlex import shutil import stat -import subprocess import os import yaml from uuid import uuid4 @@ -159,26 +159,53 @@ class K8sHelmBaseConnector(K8sConnector): ) ) - # sync local dir - self.fs.sync(from_path=cluster_id) - # init_env paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + + # helm repo add name url + command = "env KUBECONFIG={} {} repo add {} {}".format( + paths["kube_config"], self._helm_command, name, url + ) + self.log.debug("adding repo: {}".format(command)) + await self._local_async_exec( + command=command, raise_exception_on_error=True, env=env + ) + # helm repo update - command = "{} repo update".format(self._helm_command) + command = "env KUBECONFIG={} {} repo update {}".format( + paths["kube_config"], self._helm_command, name + ) self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) - # helm repo add name url - command = "{} repo add {} {}".format(self._helm_command, name, url) - self.log.debug("adding repo: {}".format(command)) + # sync fs + self.fs.reverse_sync(from_path=cluster_id) + + async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"): + self.log.debug( + "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name) + ) + + # init_env + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + + # sync local dir + self.fs.sync(from_path=cluster_id) + + # helm repo update + command = "{} repo update {}".format(self._helm_command, name) + self.log.debug("updating repo: {}".format(command)) await self._local_async_exec( - command=command, raise_exception_on_error=True, env=env + command=command, raise_exception_on_error=False, env=env ) # sync fs @@ -194,15 +221,17 @@ class K8sHelmBaseConnector(K8sConnector): _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("list repositories for cluster {}".format(cluster_id)) - # sync local dir - self.fs.sync(from_path=cluster_id) - # config filename paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} repo list --output yaml".format(self._helm_command) + # sync local dir + self.fs.sync(from_path=cluster_id) + + command = "env KUBECONFIG={} {} repo list --output yaml".format( + paths["kube_config"], self._helm_command + ) # Set exception to false because if there are no repos just want an empty list output, _rc = await self._local_async_exec( @@ -223,19 +252,20 @@ class K8sHelmBaseConnector(K8sConnector): return [] async def repo_remove(self, cluster_uuid: str, name: str): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id)) - # sync local dir - self.fs.sync(from_path=cluster_id) - # init env, paths paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = "{} repo remove {}".format(self._helm_command, name) + # sync local dir + self.fs.sync(from_path=cluster_id) + + command = "env KUBECONFIG={} {} repo remove {}".format( + paths["kube_config"], self._helm_command, name + ) await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) @@ -329,6 +359,11 @@ class K8sHelmBaseConnector(K8sConnector): kdu_name: str = None, namespace: str = None, ): + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + # params to str params_str, file_to_delete = self._params_to_file_option( cluster_id=cluster_id, params=params @@ -342,8 +377,19 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_install_command( - kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + paths["kube_config"], ) self.log.debug("installing: {}".format(command)) @@ -364,7 +410,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=False, ) ) @@ -377,7 +422,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -393,8 +437,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=namespace, db_dict=db_dict, operation="install", - run_once=True, - check_every=0, ) if rc != 0: @@ -428,6 +470,9 @@ class K8sHelmBaseConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + # params to str params_str, file_to_delete = self._params_to_file_option( cluster_id=cluster_id, params=params @@ -441,6 +486,10 @@ class K8sHelmBaseConnector(K8sConnector): version = str(parts[1]) kdu_model = parts[0] + repo = self._split_repo(kdu_model) + if repo: + await self.repo_update(cluster_id, repo) + command = self._get_upgrade_command( kdu_model, kdu_instance, @@ -449,12 +498,12 @@ class K8sHelmBaseConnector(K8sConnector): version, atomic, timeout, + paths["kube_config"], ) self.log.debug("upgrading: {}".format(command)) if atomic: - # exec helm in a task exec_task = asyncio.ensure_future( coro_or_future=self._local_async_exec( @@ -469,7 +518,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=False, ) ) @@ -481,7 +529,6 @@ class K8sHelmBaseConnector(K8sConnector): output, rc = exec_task.result() else: - output, rc = await self._local_async_exec( command=command, raise_exception_on_error=False, env=env ) @@ -497,8 +544,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="upgrade", - run_once=True, - check_every=0, ) if rc != 0: @@ -541,7 +586,6 @@ class K8sHelmBaseConnector(K8sConnector): async def rollback( self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None ): - _, cluster_id = self._get_namespace_cluster_id(cluster_uuid) self.log.debug( "rollback kdu_instance {} to revision {} from cluster {}".format( @@ -562,8 +606,11 @@ class K8sHelmBaseConnector(K8sConnector): cluster_name=cluster_id, create_if_not_exist=True ) + # sync local dir + self.fs.sync(from_path=cluster_id) + command = self._get_rollback_command( - kdu_instance, instance_info["namespace"], revision + kdu_instance, instance_info["namespace"], revision, paths["kube_config"] ) self.log.debug("rolling_back: {}".format(command)) @@ -582,7 +629,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=False, ) ) @@ -601,8 +647,6 @@ class K8sHelmBaseConnector(K8sConnector): namespace=instance_info["namespace"], db_dict=db_dict, operation="rollback", - run_once=True, - check_every=0, ) if rc != 0: @@ -647,14 +691,19 @@ class K8sHelmBaseConnector(K8sConnector): # look for instance to obtain namespace instance_info = await self.get_instance_info(cluster_uuid, kdu_instance) if not instance_info: - raise K8sException("kdu_instance {} not found".format(kdu_instance)) - + self.log.warning(("kdu_instance {} not found".format(kdu_instance))) + return True # init env, paths paths, env = self._init_paths_env( cluster_name=cluster_id, create_if_not_exist=True ) - command = self._get_uninstall_command(kdu_instance, instance_info["namespace"]) + # sync local dir + self.fs.sync(from_path=cluster_id) + + command = self._get_uninstall_command( + kdu_instance, instance_info["namespace"], paths["kube_config"] + ) output, _rc = await self._local_async_exec( command=command, raise_exception_on_error=True, env=env ) @@ -747,11 +796,18 @@ class K8sHelmBaseConnector(K8sConnector): ) ) + # init env, paths + paths, env = self._init_paths_env( + cluster_name=cluster_id, create_if_not_exist=True + ) + # sync local dir self.fs.sync(from_path=cluster_id) # get list of services names for kdu - service_names = await self._get_services(cluster_id, kdu_instance, namespace) + service_names = await self._get_services( + cluster_id, kdu_instance, namespace, paths["kube_config"] + ) service_list = [] for service in service_names: @@ -766,7 +822,6 @@ class K8sHelmBaseConnector(K8sConnector): async def get_service( self, cluster_uuid: str, service_name: str, namespace: str ) -> object: - self.log.debug( "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format( service_name, namespace, cluster_uuid @@ -785,7 +840,9 @@ class K8sHelmBaseConnector(K8sConnector): return service - async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str: + async def status_kdu( + self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs + ) -> Union[str, dict]: """ This call would retrieve tha current state of a given KDU instance. It would be would allow to retrieve the _composition_ (i.e. K8s objects) and _specific @@ -795,6 +852,8 @@ class K8sHelmBaseConnector(K8sConnector): :param cluster_uuid: UUID of a K8s cluster known by OSM :param kdu_instance: unique name for the KDU instance :param kwargs: Additional parameters (None yet) + :param yaml_format: if the return shall be returned as an YAML string or as a + dictionary :return: If successful, it will return the following vector of arguments: - K8s `namespace` in the cluster where the KDU lives - `state` of the KDU instance. It can be: @@ -837,8 +896,8 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=instance["namespace"], + yaml_format=yaml_format, show_error_log=True, - return_text=True, ) # sync fs @@ -847,7 +906,6 @@ class K8sHelmBaseConnector(K8sConnector): return status async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model values {} from (optional) repo: {}".format( kdu_model, repo_url @@ -859,7 +917,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str: - self.log.debug( "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url) ) @@ -869,7 +926,6 @@ class K8sHelmBaseConnector(K8sConnector): ) async def synchronize_repos(self, cluster_uuid: str): - self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid)) try: db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid) @@ -971,7 +1027,7 @@ class K8sHelmBaseConnector(K8sConnector): """ @abc.abstractmethod - async def _get_services(self, cluster_id, kdu_instance, namespace): + async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig): """ Implements the helm version dependent method to obtain services from a helm instance """ @@ -982,16 +1038,24 @@ class K8sHelmBaseConnector(K8sConnector): cluster_id: str, kdu_instance: str, namespace: str = None, + yaml_format: bool = False, show_error_log: bool = False, - return_text: bool = False, - ): + ) -> Union[str, dict]: """ Implements the helm version dependent method to obtain status of a helm instance """ @abc.abstractmethod def _get_install_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: """ Obtain command to be executed to delete the indicated instance @@ -999,20 +1063,32 @@ class K8sHelmBaseConnector(K8sConnector): @abc.abstractmethod def _get_upgrade_command( - self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout + self, + kdu_model, + kdu_instance, + namespace, + params_str, + version, + atomic, + timeout, + kubeconfig, ) -> str: """ Obtain command to be executed to upgrade the indicated instance """ @abc.abstractmethod - def _get_rollback_command(self, kdu_instance, namespace, revision) -> str: + def _get_rollback_command( + self, kdu_instance, namespace, revision, kubeconfig + ) -> str: """ Obtain command to be executed to rollback the indicated instance """ @abc.abstractmethod - def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str: + def _get_uninstall_command( + self, kdu_instance: str, namespace: str, kubeconfig: str + ) -> str: """ Obtain command to be executed to delete the indicated instance """ @@ -1142,22 +1218,6 @@ class K8sHelmBaseConnector(K8sConnector): new_list.append(new_dict) return new_list - def _local_exec(self, command: str) -> (str, int): - command = self._remove_multiple_spaces(command) - self.log.debug("Executing sync local command: {}".format(command)) - # raise exception if fails - output = "" - try: - output = subprocess.check_output( - command, shell=True, universal_newlines=True - ) - return_code = 0 - self.log.debug(output) - except Exception: - return_code = 1 - - return output, return_code - async def _local_async_exec( self, command: str, @@ -1166,7 +1226,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ) -> (str, int): - command = K8sHelmBaseConnector._remove_multiple_spaces(command) self.log.debug( "Executing async local command: {}, env: {}".format(command, env) @@ -1237,7 +1296,6 @@ class K8sHelmBaseConnector(K8sConnector): encode_utf8: bool = False, env: dict = None, ): - command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1) command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2) command = "{} | {}".format(command1, command2) @@ -1382,47 +1440,55 @@ class K8sHelmBaseConnector(K8sConnector): operation: str, kdu_instance: str, namespace: str = None, - check_every: float = 10, db_dict: dict = None, - run_once: bool = False, - ): - while True: - try: - await asyncio.sleep(check_every) - detailed_status = await self._status_kdu( - cluster_id=cluster_id, - kdu_instance=kdu_instance, - namespace=namespace, - return_text=False, - ) - status = detailed_status.get("info").get("description") - self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status)) - # write status to db - result = await self.write_app_status_to_db( - db_dict=db_dict, - status=str(status), - detailed_status=str(detailed_status), - operation=operation, - ) - if not result: - self.log.info("Error writing in database. Task exiting...") - return - except asyncio.CancelledError: - self.log.debug("Task cancelled") - return - except Exception as e: - self.log.debug( - "_store_status exception: {}".format(str(e)), exc_info=True - ) - pass - finally: - if run_once: - return + ) -> None: + """ + Obtains the status of the KDU instance based on Helm Charts, and stores it in the database. + + :param cluster_id (str): the cluster where the KDU instance is deployed + :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade") + :param kdu_instance (str): The KDU instance in relation to which the status is obtained + :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None + :param db_dict (dict): A dictionary with the database necessary information. It shall contain the + values for the keys: + - "collection": The Mongo DB collection to write to + - "filter": The query filter to use in the update process + - "path": The dot separated keys which targets the object to be updated + Defaults to None. + """ + + try: + detailed_status = await self._status_kdu( + cluster_id=cluster_id, + kdu_instance=kdu_instance, + yaml_format=False, + namespace=namespace, + ) + + status = detailed_status.get("info").get("description") + self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.") + + # write status to db + result = await self.write_app_status_to_db( + db_dict=db_dict, + status=str(status), + detailed_status=str(detailed_status), + operation=operation, + ) + + if not result: + self.log.info("Error writing in database. Task exiting...") + + except asyncio.CancelledError as e: + self.log.warning( + f"Exception in method {self._store_status.__name__} (task cancelled): {e}" + ) + except Exception as e: + self.log.warning(f"Exception in method {self._store_status.__name__}: {e}") # params for use in -f file # returns values file option and filename (in order to delete it at the end) def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str): - if params and len(params) > 0: self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True) @@ -1471,11 +1537,11 @@ class K8sHelmBaseConnector(K8sConnector): # check embeded chart (file or dir) if chart_name.startswith("/"): # extract file or directory name - chart_name = chart_name[chart_name.rfind("/") + 1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] # check URL elif "://" in chart_name: # extract last portion of URL - chart_name = chart_name[chart_name.rfind("/") + 1:] + chart_name = chart_name[chart_name.rfind("/") + 1 :] name = "" for c in chart_name: @@ -1500,3 +1566,10 @@ class K8sHelmBaseConnector(K8sConnector): name = name + get_random_number() return name.lower() + + def _split_repo(self, kdu_model: str) -> str: + repo_name = None + idx = kdu_model.find("/") + if idx >= 0: + repo_name = kdu_model[:idx] + return repo_name