- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="install",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
- return kdu_instance
-
- async def instances_list(self, cluster_uuid: str) -> list:
- """
- returns a list of deployed releases in a cluster
-
- :param cluster_uuid: the cluster
- :return:
- """
-
- self.log.debug("list releases for cluster {}".format(cluster_uuid))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} list --output yaml".format(
- self._helm_command, config_filename, helm_dir
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
- else:
- return []
-
- async def upgrade(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- kdu_model: str = None,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- ):
-
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- # params to str
- # params_str = K8sHelmConnector._params_to_set_option(params)
- params_str, file_to_delete = self._params_to_file_option(
- cluster_uuid=cluster_uuid, params=params
- )
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
-
- # version
- version_str = ""
- if kdu_model and ":" in kdu_model:
- parts = kdu_model.split(sep=":")
- if len(parts) == 2:
- version_str = "--version {}".format(parts[1])
- kdu_model = parts[0]
-
- # helm repo upgrade
- command = (
- "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
- ).format(
- self._helm_command,
- atomic_str,
- config_filename,
- helm_dir,
- params_str,
- timeout_str,
- kdu_instance,
- kdu_model,
- version_str,
- )
- self.log.debug("upgrading: {}".format(command))
-
- if atomic:
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
- output, rc = exec_task.result()
-
- else:
-
- output, rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
-
- # remove temporal values yaml file
- if file_to_delete:
- os.remove(file_to_delete)
-
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="upgrade",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def rollback(
- self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
- ):
-
- self.log.debug(
- "rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_uuid
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
- self._helm_command, config_filename, helm_dir, kdu_instance, revision
- )
-
- # exec helm in a task
- exec_task = asyncio.ensure_future(
- coro_or_future=self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- )
- # write status in another task
- status_task = asyncio.ensure_future(
- coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=False,
- )
- )
-
- # wait for execution task
- await asyncio.wait([exec_task])
-
- # cancel status task
- status_task.cancel()
-
- output, rc = exec_task.result()
-
- # write final status
- await self._store_status(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- db_dict=db_dict,
- operation="rollback",
- run_once=True,
- check_every=0,
- )
-
- if rc != 0:
- msg = "Error executing command: {}\nOutput: {}".format(command, output)
- self.log.error(msg)
- raise K8sException(msg)
-
- # return new revision number
- instance = await self.get_instance_info(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- if instance:
- revision = int(instance.get("Revision"))
- self.log.debug("New revision: {}".format(revision))
- return revision
- else:
- return 0
-
- async def uninstall(self, cluster_uuid: str, kdu_instance: str):
- """
- Removes an existing KDU instance. It would implicitly use the `delete` call
- (this call would happen after all _terminate-config-primitive_ of the VNF
- are invoked).
-
- :param cluster_uuid: UUID of a K8s cluster known by OSM
- :param kdu_instance: unique name for the KDU instance to be deleted
- :return: True if successful
- """
-
- self.log.debug(
- "uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_uuid
- )
- )
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} delete --purge {}".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- return self._output_to_table(output)
-
- async def exec_primitive(
- self,
- cluster_uuid: str = None,
- kdu_instance: str = None,
- primitive_name: str = None,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- ) -> str:
- """Exec primitive (Juju action)
-
- :param cluster_uuid str: The UUID of the cluster
- :param kdu_instance str: The unique name of the KDU instance
- :param primitive_name: Name of action that will be executed
- :param timeout: Timeout for action execution
- :param params: Dictionary of all the parameters needed for the action
- :db_dict: Dictionary for any additional data
-
- :return: Returns the output of the action
- """
- raise K8sException(
- "KDUs deployed with Helm don't support actions "
- "different from rollback, upgrade and status"
- )
-
- async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model values {} from (optional) repo: {}".format(
- kdu_model, repo_url
- )
- )
-
- return await self._exec_inspect_comand(
- inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_comand(
- inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
- )
-
- async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
-
- # call internal function
- return await self._status_kdu(
- cluster_uuid=cluster_uuid,
- kdu_instance=kdu_instance,
- show_error_log=True,
- return_text=True,
- )
-
- async def synchronize_repos(self, cluster_uuid: str):
-
- self.log.debug("syncronize repos for cluster helm-id: {}",)
- try:
- update_repos_timeout = (
- 300 # max timeout to sync a single repos, more than this is too much
- )
- db_k8scluster = self.db.get_one(
- "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
- )
- if db_k8scluster:
- nbi_repo_list = (
- db_k8scluster.get("_admin").get("helm_chart_repos") or []
- )
- cluster_repo_dict = (
- db_k8scluster.get("_admin").get("helm_charts_added") or {}
- )
- # elements that must be deleted
- deleted_repo_list = []
- added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
-
- # obtain repos to add: registered by nbi but not added
- repos_to_add = [
- repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
- ]
-
- # obtain repos to delete: added by cluster but not in nbi list
- repos_to_delete = [
- repo
- for repo in cluster_repo_dict.keys()
- if repo not in nbi_repo_list
- ]
-
- # delete repos: must delete first then add because there may be
- # different repos with same name but
- # different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
- for repo_id in repos_to_delete:
- # try to delete repos
- try:
- repo_delete_task = asyncio.ensure_future(
- self.repo_remove(
- cluster_uuid=cluster_uuid,
- name=cluster_repo_dict[repo_id],
- )
- )
- await asyncio.wait_for(repo_delete_task, update_repos_timeout)
- except Exception as e:
- self.warning(
- "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
- repo_id, cluster_repo_dict[repo_id], str(e)
- )
- )
- # always add to the list of to_delete if there is an error
- # because if is not there
- # deleting raises error
- deleted_repo_list.append(repo_id)
-
- # add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
- for repo_id in repos_to_add:
- # obtain the repo data from the db
- # if there is an error getting the repo in the database we will
- # ignore this repo and continue
- # because there is a possible race condition where the repo has
- # been deleted while processing
- db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
- self.log.debug(
- "obtained repo: id, {}, name: {}, url: {}".format(
- repo_id, db_repo["name"], db_repo["url"]
- )
- )
- try:
- repo_add_task = asyncio.ensure_future(
- self.repo_add(
- cluster_uuid=cluster_uuid,
- name=db_repo["name"],
- url=db_repo["url"],
- repo_type="chart",
- )
- )
- await asyncio.wait_for(repo_add_task, update_repos_timeout)
- added_repo_dict[repo_id] = db_repo["name"]
- self.log.debug(
- "added repo: id, {}, name: {}".format(
- repo_id, db_repo["name"]
- )
- )
- except Exception as e:
- # deal with error adding repo, adding a repo that already
- # exists does not raise any error
- # will not raise error because a wrong repos added by
- # anyone could prevent instantiating any ns
- self.log.error(
- "Error adding repo id: {}, err_msg: {} ".format(
- repo_id, repr(e)
- )
- )
-
- return deleted_repo_list, added_repo_dict
-
- else: # else db_k8scluster does not exist
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
- except Exception as e:
- self.log.error("Error synchronizing repos: {}".format(str(e)))
- raise K8sException("Error synchronizing repos")
-
- """
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
- """
-
- async def _exec_inspect_comand(
- self, inspect_command: str, kdu_model: str, repo_url: str = None
- ):
-
- repo_str = ""
- if repo_url:
- repo_str = " --repo {}".format(repo_url)
- idx = kdu_model.find("/")
- if idx >= 0:
- idx += 1
- kdu_model = kdu_model[idx:]
-
- inspect_command = "{} inspect {} {}{}".format(
- self._helm_command, inspect_command, kdu_model, repo_str
- )
- output, _rc = await self._local_async_exec(
- command=inspect_command, encode_utf8=True
- )
-
- return output
-
- async def _status_kdu(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- show_error_log: bool = False,
- return_text: bool = False,
- ):
-
- self.log.debug("status of kdu_instance {}".format(kdu_instance))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, rc = await self._local_async_exec(
- command=command,
- raise_exception_on_error=True,
- show_error_log=show_error_log,
- )
-
- if return_text:
- return str(output)
-
- if rc != 0:
- return None
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- # remove field 'notes'
- try:
- del data.get("info").get("status")["notes"]
- except KeyError:
- pass
-
- # parse field 'resources'
- try:
- resources = str(data.get("info").get("status").get("resources"))
- resource_table = self._output_to_table(resources)
- data.get("info").get("status")["resources"] = resource_table
- except Exception:
- pass
-
- return data
-
- async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
- instances = await self.instances_list(cluster_uuid=cluster_uuid)
- for instance in instances:
- if instance.get("Name") == kdu_instance:
- return instance
- self.log.debug("Instance {} not found".format(kdu_instance))
- return None
-
- @staticmethod
- def _generate_release_name(chart_name: str):
- # check embeded chart (file or dir)
- if chart_name.startswith("/"):
- # extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
- # check URL
- elif "://" in chart_name:
- # extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
-
- name = ""
- for c in chart_name:
- if c.isalpha() or c.isnumeric():
- name += c
- else:
- name += "-"
- if len(name) > 35:
- name = name[0:35]
-
- # if does not start with alpha character, prefix 'a'
- if not name[0].isalpha():
- name = "a" + name
-
- name += "-"
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- s = s.rjust(10, "0")
- return s
-
- name = name + get_random_number()
- return name.lower()
-
- async def _store_status(
- self,
- cluster_uuid: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance,
- return_text=False
- )
- status = detailed_status.get("info").get("Description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)))
- pass
- finally:
- if run_once:
- return
-
- async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
-
- status = await self._status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False