- self.log.info("K8S Helm connector initialized")
-
- @staticmethod
- def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
- """
- Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
- cluster_id for backward compatibility
- """
- namespace, _, cluster_id = cluster_uuid.rpartition(':')
- return namespace, cluster_id
-
- async def init_env(
- self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
- ) -> (str, bool):
- """
- It prepares a given K8s cluster environment to run Charts on both sides:
- client (OSM)
- server (Tiller)
-
- :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
- '.kube/config'
- :param namespace: optional namespace to be used for helm. By default,
- 'kube-system' will be used
- :param reuse_cluster_uuid: existing cluster uuid for reuse
- :return: uuid of the K8s cluster and True if connector has installed some
- software in the cluster
- (on error, an exception will be raised)
- """
-
- if reuse_cluster_uuid:
- namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
- namespace = namespace_ or namespace
- else:
- cluster_id = str(uuid4())
- cluster_uuid = "{}:{}".format(namespace, cluster_id)
-
- self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
-
- # create config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
- with open(config_filename, "w") as f:
- f.write(k8s_creds)
-
- # check if tiller pod is up in cluster
- command = "{} --kubeconfig={} --namespace={} get deployments".format(
- self.kubectl_command, config_filename, namespace
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
-
- output_table = self._output_to_table(output=output)
-
- # find 'tiller' pod in all pods
- already_initialized = False
- try:
- for row in output_table:
- if row[0].startswith("tiller-deploy"):
- already_initialized = True
- break
- except Exception:
- pass
-
- # helm init
- n2vc_installed_sw = False
- if not already_initialized:
- self.log.info(
- "Initializing helm in client and server: {}".format(cluster_id)
- )
- command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
- self.kubectl_command, config_filename, self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
- "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
- ).format(self.kubectl_command, config_filename, self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
- "init").format(self._helm_command, config_filename, namespace, helm_dir,
- self.service_account)
- _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
- n2vc_installed_sw = True
- else:
- # check client helm installation
- check_file = helm_dir + "/repository/repositories.yaml"
- if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
- self.log.info("Initializing helm in client: {}".format(cluster_id))
- command = (
- "{} --kubeconfig={} --tiller-namespace={} "
- "--home={} init --client-only"
- ).format(self._helm_command, config_filename, namespace, helm_dir)
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
- else:
- self.log.info("Helm client already initialized")
-
- self.log.info("Cluster {} initialized".format(cluster_id))
-
- return cluster_uuid, n2vc_installed_sw
-
- async def repo_add(
- self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
- ):
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
- cluster_id, repo_type, name, url))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- # helm repo update
- command = "{} --kubeconfig={} --home={} repo update".format(
- self._helm_command, config_filename, helm_dir
- )
- self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=False)
-
- # helm repo add name url
- command = "{} --kubeconfig={} --home={} repo add {} {}".format(
- self._helm_command, config_filename, helm_dir, name, url
- )
- self.log.debug("adding repo: {}".format(command))
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- async def repo_list(self, cluster_uuid: str) -> list:
- """
- Get the list of registered repositories
-
- :return: list of registered repositories: [ (name, url) .... ]
- """
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list repositories for cluster {}".format(cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
- self._helm_command, config_filename, helm_dir
- )
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
- if output and len(output) > 0:
- return yaml.load(output, Loader=yaml.SafeLoader)
- else:
- return []
-
- async def repo_remove(self, cluster_uuid: str, name: str):
- """
- Remove a repository from OSM
-
- :param cluster_uuid: the cluster or 'namespace:cluster'
- :param name: repo name in OSM
- :return: True if successful
- """
-
- _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug("list repositories for cluster {}".format(cluster_id))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} repo remove {}".format(
- self._helm_command, config_filename, helm_dir, name
- )
-
- await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- async def reset(
- self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
- ) -> bool:
-
- namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
- )
-
- # get kube and helm directories
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=False
- )
-
- # uninstall releases if needed
- releases = await self.instances_list(cluster_uuid=cluster_uuid)
- if len(releases) > 0:
- if force:
- for r in releases:
- try:
- kdu_instance = r.get("Name")
- chart = r.get("Chart")
- self.log.debug(
- "Uninstalling {} -> {}".format(chart, kdu_instance)
- )
- await self.uninstall(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- except Exception as e:
- self.log.error(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
- )
- else:
- msg = (
- "Cluster has releases and not force. Cannot reset K8s "
- "environment. Cluster uuid: {}"
- ).format(cluster_id)
- self.log.error(msg)
- raise K8sException(msg)
-
- if uninstall_sw:
-
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
-
- if not namespace:
- # find namespace for tiller pod
- command = "{} --kubeconfig={} get deployments --all-namespaces".format(
- self.kubectl_command, config_filename
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- output_table = K8sHelmConnector._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if "tiller-deploy" in r[1]:
- namespace = r[0]
- break
- except Exception:
- pass
- else:
- msg = "Tiller deployment not found in cluster {}".format(cluster_id)
- self.log.error(msg)
-
- self.log.debug("namespace for tiller: {}".format(namespace))
-
- if namespace:
- # uninstall tiller from cluster
- self.log.debug(
- "Uninstalling tiller from cluster {}".format(cluster_id)
- )
- command = "{} --kubeconfig={} --home={} reset".format(
- self._helm_command, config_filename, helm_dir
- )
- self.log.debug("resetting: {}".format(command))
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
- )
- # Delete clusterrolebinding and serviceaccount.
- # Ignore if errors for backward compatibility
- command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
- "io/osm-tiller-cluster-rule").format(self.kubectl_command,
- config_filename)
- output, _rc = await self._local_async_exec(command=command,
- raise_exception_on_error=False)
- command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
- format(self.kubectl_command, config_filename, self.service_account)
- output, _rc = await self._local_async_exec(command=command,
- raise_exception_on_error=False)
-
- else:
- self.log.debug("namespace not found")
-
- # delete cluster directory
- direct = self.fs.path + "/" + cluster_id
- self.log.debug("Removing directory {}".format(direct))
- shutil.rmtree(direct, ignore_errors=True)
-
- return True