+
+ def _split_version(self, kdu_model: str) -> (str, str):
+ version = None
+ if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
+ parts = kdu_model.split(sep=":")
+ if len(parts) == 2:
+ version = str(parts[1])
+ kdu_model = parts[0]
+ return kdu_model, version
+
+ def _split_repo(self, kdu_model: str) -> (str, str):
+ """Obtain the Helm Chart's repository and Chart's names from the KDU model
+
+ Args:
+ kdu_model (str): Associated KDU model
+
+ Returns:
+ (str, str): Tuple with the Chart name in index 0, and the repo name
+ in index 2; if there was a problem finding them, return None
+ for both
+ """
+
+ chart_name = None
+ repo_name = None
+
+ idx = kdu_model.find("/")
+ if idx >= 0:
+ chart_name = kdu_model[idx + 1 :]
+ repo_name = kdu_model[:idx]
+
+ return chart_name, repo_name
+
+ async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
+ """Obtain the Helm repository for an Helm Chart
+
+ Args:
+ kdu_model (str): the KDU model associated with the Helm Chart instantiation
+ cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
+
+ Returns:
+ str: the repository URL; if Helm Chart is a local one, the function returns None
+ """
+
+ _, repo_name = self._split_repo(kdu_model=kdu_model)
+
+ repo_url = None
+ if repo_name:
+ # Find repository link
+ local_repo_list = await self.repo_list(cluster_uuid)
+ for repo in local_repo_list:
+ if repo["name"] == repo_name:
+ repo_url = repo["url"]
+ break # it is not necessary to continue the loop if the repo link was found...
+
+ return repo_url
+
+ async def create_certificate(
+ self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+ ):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_certificate(
+ namespace=namespace,
+ name=name,
+ dns_prefix=dns_prefix,
+ secret_name=secret_name,
+ usages=[usage],
+ issuer_name="ca-issuer",
+ )
+
+ async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_certificate(namespace, certificate_name)
+
+ async def create_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ labels,
+ ):
+ """
+ Create a namespace in a specific cluster
+
+ :param namespace: Namespace to be created
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param labels: Dictionary with labels for the new namespace
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_namespace(
+ name=namespace,
+ labels=labels,
+ )
+
+ async def delete_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Delete a namespace in a specific cluster
+
+ :param namespace: namespace to be deleted
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_namespace(
+ name=namespace,
+ )
+
+ async def copy_secret_data(
+ self,
+ src_secret: str,
+ dst_secret: str,
+ cluster_uuid: str,
+ data_key: str,
+ src_namespace: str = "osm",
+ dst_namespace: str = "osm",
+ ):
+ """
+ Copy a single key and value from an existing secret to a new one
+
+ :param src_secret: name of the existing secret
+ :param dst_secret: name of the new secret
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param data_key: key of the existing secret to be copied
+ :param src_namespace: Namespace of the existing secret
+ :param dst_namespace: Namespace of the new secret
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ secret_data = await kubectl.get_secret_content(
+ name=src_secret,
+ namespace=src_namespace,
+ )
+ # Only the corresponding data_key value needs to be copy
+ data = {data_key: secret_data.get(data_key)}
+ await kubectl.create_secret(
+ name=dst_secret,
+ data=data,
+ namespace=dst_namespace,
+ secret_type="Opaque",
+ )
+
+ async def setup_default_rbac(
+ self,
+ name,
+ namespace,
+ cluster_uuid,
+ api_groups,
+ resources,
+ verbs,
+ service_account,
+ ):
+ """
+ Create a basic RBAC for a new namespace.
+
+ :param name: name of both Role and Role Binding
+ :param namespace: K8s namespace
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param api_groups: Api groups to be allowed in Policy Rule
+ :param resources: Resources to be allowed in Policy Rule
+ :param verbs: Verbs to be allowed in Policy Rule
+ :param service_account: Service Account name used to bind the Role
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_role(
+ name=name,
+ labels={},
+ namespace=namespace,
+ api_groups=api_groups,
+ resources=resources,
+ verbs=verbs,
+ )
+ await kubectl.create_role_binding(
+ name=name,
+ labels={},
+ namespace=namespace,
+ role_name=name,
+ sa_name=service_account,
+ )