- async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model values {} from (optional) repo: {}".format(
- kdu_model, repo_url
- )
- )
-
- return await self._exec_inspect_comand(
- inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
-
- # call internal function
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- return await self._status_kdu(
- cluster_id=cluster_id,
- kdu_instance=kdu_instance,
- show_error_log=True,
- return_text=True,
- )
-
- async def get_services(self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str) -> list:
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "get_services: cluster_uuid: {}, kdu_instance: {}".format(
- cluster_uuid, kdu_instance
- )
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- status = await self._status_kdu(
- cluster_id, kdu_instance, return_text=False
- )
-
- service_names = self._parse_helm_status_service_info(status)
- service_list = []
- for service in service_names:
- service = await self.get_service(cluster_uuid, service, namespace)
- service_list.append(service)
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- return service_list
-
- async def get_service(self,
- cluster_uuid: str,
- service_name: str,
- namespace: str) -> object:
-
- self.log.debug(
- "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
- service_name, namespace, cluster_uuid)
- )
-
- # get paths
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # sync local dir
- self.fs.sync(from_path=cluster_id)
-
- command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, config_filename, namespace, service_name
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- service = {
- "name": service_name,
- "type": self._get_deep(data, ("spec", "type")),
- "ports": self._get_deep(data, ("spec", "ports")),
- "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
- }
- if service["type"] == "LoadBalancer":
- ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
- ip_list = [elem["ip"] for elem in ip_map_list]
- service["external_ip"] = ip_list
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_id)
-
- return service
-
- async def synchronize_repos(self, cluster_uuid: str):
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("syncronize repos for cluster helm-id: {}",)
- try:
- update_repos_timeout = (
- 300 # max timeout to sync a single repos, more than this is too much
- )
- db_k8scluster = self.db.get_one(
- "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
- )
- if db_k8scluster:
- nbi_repo_list = (
- db_k8scluster.get("_admin").get("helm_chart_repos") or []
- )
- cluster_repo_dict = (
- db_k8scluster.get("_admin").get("helm_charts_added") or {}
- )
- # elements that must be deleted
- deleted_repo_list = []
- added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
-
- # obtain repos to add: registered by nbi but not added
- repos_to_add = [
- repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
- ]
-
- # obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [
- repo
- for repo in cluster_repo_dict.keys()
- if repo not in nbi_repo_list
- ]
-
- # delete repos: must delete first then add because there may be
- # different repos with same name but
- # different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
- for repo_id in repos_to_delete:
- # try to delete repos
- try:
- repo_delete_task = asyncio.ensure_future(
- self.repo_remove(
- cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id],
- )
- )
- await asyncio.wait_for(repo_delete_task, update_repos_timeout)
- except Exception as e:
- self.warning(
- "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
- repo_id, cluster_repo_dict[repo_id], str(e)
- )
- )
- # always add to the list of to_delete if there is an error
- # because if is not there
- # deleting raises error
- deleted_repo_list.append(repo_id)
-
- # add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
- for repo_id in repos_to_add:
- # obtain the repo data from the db
- # if there is an error getting the repo in the database we will
- # ignore this repo and continue
- # because there is a possible race condition where the repo has
- # been deleted while processing
- db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug(
- "obtained repo: id, {}, name: {}, url: {}".format(
- repo_id, db_repo["name"], db_repo["url"]
- )
- )
- try:
- repo_add_task = asyncio.ensure_future(
- self.repo_add(
- cluster_uuid=cluster_uuid,
- name=db_repo["name"],
- url=db_repo["url"],
- repo_type="chart",
- )
- )
- await asyncio.wait_for(repo_add_task, update_repos_timeout)
- added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug(
- "added repo: id, {}, name: {}".format(
- repo_id, db_repo["name"]
- )
- )
- except Exception as e:
- # deal with error adding repo, adding a repo that already
- # exists does not raise any error
- # will not raise error because a wrong repos added by
- # anyone could prevent instantiating any ns
- self.log.error(
- "Error adding repo id: {}, err_msg: {} ".format(
- repo_id, repr(e)
- )
- )
-
- return deleted_repo_list, added_repo_dict
-
- else: # else db_k8scluster does not exist
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
- except Exception as e:
- self.log.error("Error synchronizing repos: {}".format(str(e)))
- raise K8sException("Error synchronizing repos")
-
- """
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
- """
-
- async def _exec_inspect_comand(
- self, inspect_command: str, kdu_model: str, repo_url: str = None
- ):
-
- repo_str = ""
- if repo_url:
- repo_str = " --repo {}".format(repo_url)
- idx = kdu_model.find("/")
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
-
- inspect_command = "{} inspect {} {}{}".format(
- self._helm_command, inspect_command, kdu_model, repo_str
- )
- output, _rc = await self._local_async_exec(
- command=inspect_command, encode_utf8=True
- )
-
- return output
-
- async def _status_kdu(
- self,
- cluster_id: str,
- kdu_instance: str,
- show_error_log: bool = False,
- return_text: bool = False,
- ):
-
- self.log.debug("status of kdu_instance {}".format(kdu_instance))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, rc = await self._local_async_exec(
- command=command,
- raise_exception_on_error=True,
- show_error_log=show_error_log,
- )
-
- if return_text:
- return str(output)
-
- if rc != 0:
- return None
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- # remove field 'notes'
- try:
- del data.get("info").get("status")["notes"]
- except KeyError:
- pass
-
- # parse field 'resources'
- try:
- resources = str(data.get("info").get("status").get("resources"))
- resource_table = self._output_to_table(resources)
- data.get("info").get("status")["resources"] = resource_table
- except Exception:
- pass
-
- return data
-
- async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
- instances = await self.instances_list(cluster_uuid=cluster_uuid)
- for instance in instances:
- if instance.get("Name") == kdu_instance:
- return instance
- self.log.debug("Instance {} not found".format(kdu_instance))
- return None
-
- @staticmethod
- def _generate_release_name(chart_name: str):
- # check embeded chart (file or dir)
- if chart_name.startswith("/"):
- # extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
- # check URL
- elif "://" in chart_name:
- # extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
-
- name = ""
- for c in chart_name:
- if c.isalpha() or c.isnumeric():
- name += c
- else:
- name += "-"
- if len(name) > 35:
- name = name[0:35]
-
- # if does not start with alpha character, prefix 'a'
- if not name[0].isalpha():
- name = "a" + name
-
- name += "-"
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- s = s.rjust(10, "0")
- return s
-
- name = name + get_random_number()
- return name.lower()
-
- async def _store_status(
- self,
- cluster_id: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance,
- return_text=False
- )
- status = detailed_status.get("info").get("Description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
- pass
- finally:
- if run_once:
- return
-
- async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
-
- status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False