-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
-
- # version
- version_str = ""
- if kdu_model and ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version_str = "--version {}".format(parts[1])
- kdu_model = parts[0]
-
- # helm repo upgrade
- command = (
- "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
- ).format(
- self._helm_command,
- atomic_str,
- config_filename,
- helm_dir,
- params_str,
- timeout_str,
- kdu_instance,
- kdu_model,
- version_str,
- )
- self.log.debug("upgrading: {}".format(command))
-
- if atomic:
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
- output, rc = exec_task.result()
-
- else:
-
- output, rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
-
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
-
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def rollback(
- self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
- ):
-
- self.log.debug(
- "rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_uuid
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
- self._helm_command, config_filename, helm_dir, kdu_instance, revision
- )
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
-
- output, rc = exec_task.result()
-
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
- """
- Removes an existing KDU instance. It would implicitly use the `delete` call
- (this call would happen after all _terminate-config-primitive_ of the VNF
- are invoked).
-
- :param cluster_uuid: UUID of a K8s cluster known by OSM
- :param kdu_instance: unique name for the KDU instance to be deleted
- :return: True if successful
- """
-
- self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_uuid
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} delete --purge {}".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- return self._output_to_table(output)
-
- async def exec_primitive(
- self,
- cluster_uuid: str = None,
- kdu_instance: str = None,
- primitive_name: str = None,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- ) -> str:
- """Exec primitive (Juju action)
-
- :param cluster_uuid str: The UUID of the cluster
- :param kdu_instance str: The unique name of the KDU instance
- :param primitive_name: Name of action that will be executed
- :param timeout: Timeout for action execution
- :param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
-
- :return: Returns the output of the action
- """
- raise K8sException(
- "KDUs deployed with Helm don't support actions "
- "different from rollback, upgrade and status"
- )
-
- async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model values {} from (optional) repo: {}".format(
- kdu_model, repo_url
- )
- )
-
- return await self._exec_inspect_comand(
- inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
-
- # call internal function
- return await self._status_kdu(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- show_error_log=True,
- return_text=True,
- )
-
- async def synchronize_repos(self, cluster_uuid: str):
-
- self.log.debug("syncronize repos for cluster helm-id: {}",)
- try:
- update_repos_timeout = (
- 300 # max timeout to sync a single repos, more than this is too much
- )
- db_k8scluster = self.db.get_one(
- "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
- )
- if db_k8scluster:
- nbi_repo_list = (
- db_k8scluster.get("_admin").get("helm_chart_repos") or []
- )
- cluster_repo_dict = (
- db_k8scluster.get("_admin").get("helm_charts_added") or {}
- )
- # elements that must be deleted
- deleted_repo_list = []
- added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
-
- # obtain repos to add: registered by nbi but not added
- repos_to_add = [
- repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
- ]
-
- # obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [
- repo
- for repo in cluster_repo_dict.keys()
- if repo not in nbi_repo_list
- ]
-
- # delete repos: must delete first then add because there may be
- # different repos with same name but
- # different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
- for repo_id in repos_to_delete:
- # try to delete repos
- try:
- repo_delete_task = asyncio.ensure_future(
- self.repo_remove(
- cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id],
- )
- )
- await asyncio.wait_for(repo_delete_task, update_repos_timeout)
- except Exception as e:
- self.warning(
- "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
- repo_id, cluster_repo_dict[repo_id], str(e)
- )
- )
- # always add to the list of to_delete if there is an error
- # because if is not there
- # deleting raises error
- deleted_repo_list.append(repo_id)
-
- # add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
- for repo_id in repos_to_add:
- # obtain the repo data from the db
- # if there is an error getting the repo in the database we will
- # ignore this repo and continue
- # because there is a possible race condition where the repo has
- # been deleted while processing
- db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug(
- "obtained repo: id, {}, name: {}, url: {}".format(
- repo_id, db_repo["name"], db_repo["url"]
- )
- )
- try:
- repo_add_task = asyncio.ensure_future(
- self.repo_add(
- cluster_uuid=cluster_uuid,
- name=db_repo["name"],
- url=db_repo["url"],
- repo_type="chart",
- )
- )
- await asyncio.wait_for(repo_add_task, update_repos_timeout)
- added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug(
- "added repo: id, {}, name: {}".format(
- repo_id, db_repo["name"]
- )
- )
- except Exception as e:
- # deal with error adding repo, adding a repo that already
- # exists does not raise any error
- # will not raise error because a wrong repos added by
- # anyone could prevent instantiating any ns
- self.log.error(
- "Error adding repo id: {}, err_msg: {} ".format(
- repo_id, repr(e)
- )
- )
-
- return deleted_repo_list, added_repo_dict
-
- else: # else db_k8scluster does not exist
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
- except Exception as e:
- self.log.error("Error synchronizing repos: {}".format(str(e)))
- raise K8sException("Error synchronizing repos")
-
- """
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
- """
-
- async def _exec_inspect_comand(
- self, inspect_command: str, kdu_model: str, repo_url: str = None
- ):
-
- repo_str = ""
- if repo_url:
- repo_str = " --repo {}".format(repo_url)
- idx = kdu_model.find("/")
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
-
- inspect_command = "{} inspect {} {}{}".format(
- self._helm_command, inspect_command, kdu_model, repo_str
- )
- output, _rc = await self._local_async_exec(
- command=inspect_command, encode_utf8=True
- )
-
- return output