################################### P U B L I C ####################################
####################################################################################
"""
+ service_account = "osm"
def __init__(
self,
self.log.info("K8S Helm connector initialized")
+ @staticmethod
+ def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+ """
+ Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
+ cluster_id for backward compatibility
+ """
+ namespace, _, cluster_id = cluster_uuid.rpartition(':')
+ return namespace, cluster_id
+
async def init_env(
self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
(on error, an exception will be raised)
"""
- cluster_uuid = reuse_cluster_uuid
- if not cluster_uuid:
- cluster_uuid = str(uuid4())
+ if reuse_cluster_uuid:
+ namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
+ namespace = namespace_ or namespace
+ else:
+ cluster_id = str(uuid4())
+ cluster_uuid = "{}:{}".format(namespace, cluster_id)
- self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
+ self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
# create config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
with open(config_filename, "w") as f:
f.write(k8s_creds)
n2vc_installed_sw = False
if not already_initialized:
self.log.info(
- "Initializing helm in client and server: {}".format(cluster_uuid)
- )
- command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
- self._helm_command, config_filename, namespace, helm_dir
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
+ "Initializing helm in client and server: {}".format(cluster_id)
)
+ command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
+ self.kubectl_command, config_filename, self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
+ "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
+ ).format(self.kubectl_command, config_filename, self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+ "init").format(self._helm_command, config_filename, namespace, helm_dir,
+ self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
n2vc_installed_sw = True
else:
# check client helm installation
check_file = helm_dir + "/repository/repositories.yaml"
- if not self._check_file_exists(
- filename=check_file, exception_if_not_exists=False
- ):
- self.log.info("Initializing helm in client: {}".format(cluster_uuid))
+ if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
+ self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
"{} --kubeconfig={} --tiller-namespace={} "
"--home={} init --client-only"
else:
self.log.info("Helm client already initialized")
- self.log.info("Cluster initialized {}".format(cluster_uuid))
+ self.log.info("Cluster {} initialized".format(cluster_id))
return cluster_uuid, n2vc_installed_sw
async def repo_add(
self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
-
- self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_id, repo_type, name, url))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# helm repo update
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.log.debug("list repositories for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
"""
Remove a repository from OSM
- :param cluster_uuid: the cluster
+ :param cluster_uuid: the cluster or 'namespace:cluster'
:param name: repo name in OSM
:return: True if successful
"""
- self.log.debug("list repositories for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} repo remove {}".format(
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
+ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
- "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
+ "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
)
# get kube and helm directories
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=False
+ cluster_name=cluster_id, create_if_not_exist=False
)
# uninstall releases if needed
msg = (
"Cluster has releases and not force. Cannot reset K8s "
"environment. Cluster uuid: {}"
- ).format(cluster_uuid)
+ ).format(cluster_id)
self.log.error(msg)
raise K8sException(msg)
if uninstall_sw:
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
-
- # find namespace for tiller pod
- command = "{} --kubeconfig={} get deployments --all-namespaces".format(
- self.kubectl_command, config_filename
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- output_table = K8sHelmConnector._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if "tiller-deploy" in r[1]:
- namespace = r[0]
- break
- except Exception:
- pass
- else:
- msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
- self.log.error(msg)
-
- self.log.debug("namespace for tiller: {}".format(namespace))
-
- force_str = "--force"
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
- if namespace:
- # delete tiller deployment
- self.log.debug(
- "Deleting tiller deployment for cluster {}, namespace {}".format(
- cluster_uuid, namespace
- )
+ if not namespace:
+ # find namespace for tiller pod
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, config_filename
)
- command = (
- "{} --namespace {} --kubeconfig={} {} delete deployment "
- "tiller-deploy"
- ).format(self.kubectl_command, namespace, config_filename, force_str)
- await self._local_async_exec(
+ output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False
)
+ output_table = K8sHelmConnector._output_to_table(output=output)
+ namespace = None
+ for r in output_table:
+ try:
+ if "tiller-deploy" in r[1]:
+ namespace = r[0]
+ break
+ except Exception:
+ pass
+ else:
+ msg = "Tiller deployment not found in cluster {}".format(cluster_id)
+ self.log.error(msg)
+ self.log.debug("namespace for tiller: {}".format(namespace))
+
+ if namespace:
# uninstall tiller from cluster
self.log.debug(
- "Uninstalling tiller from cluster {}".format(cluster_uuid)
+ "Uninstalling tiller from cluster {}".format(cluster_id)
)
command = "{} --kubeconfig={} --home={} reset".format(
self._helm_command, config_filename, helm_dir
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True
)
+ # Delete clusterrolebinding and serviceaccount.
+ # Ignore if errors for backward compatibility
+ command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
+ "io/osm-tiller-cluster-rule").format(self.kubectl_command,
+ config_filename)
+ output, _rc = await self._local_async_exec(command=command,
+ raise_exception_on_error=False)
+ command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
+ format(self.kubectl_command, config_filename, self.service_account)
+ output, _rc = await self._local_async_exec(command=command,
+ raise_exception_on_error=False)
+
else:
self.log.debug("namespace not found")
# delete cluster directory
- direct = self.fs.path + "/" + cluster_uuid
+ direct = self.fs.path + "/" + cluster_id
self.log.debug("Removing directory {}".format(direct))
shutil.rmtree(direct, ignore_errors=True)
namespace: str = None,
):
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
- cluster_uuid=cluster_uuid, params=params
+ cluster_id=cluster_id, params=params
)
timeout_str = ""
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
try:
result = await self._status_kdu(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
show_error_log=False,
)
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="install",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="install",
"""
returns a list of deployed releases in a cluster
- :param cluster_uuid: the cluster
+ :param cluster_uuid: the 'cluster' or 'namespace:cluster'
:return:
"""
- self.log.debug("list releases for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list releases for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} list --output yaml".format(
db_dict: dict = None,
):
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
- cluster_uuid=cluster_uuid, params=params
+ cluster_id=cluster_id, params=params
)
timeout_str = ""
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="upgrade",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="upgrade",
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_uuid
+ kdu_instance, revision, cluster_id
)
)
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="rollback",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="rollback",
(this call would happen after all _terminate-config-primitive_ of the VNF
are invoked).
- :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
:return: True if successful
"""
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_uuid
+ kdu_instance, cluster_id
)
)
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} delete --purge {}".format(
) -> str:
"""Exec primitive (Juju action)
- :param cluster_uuid str: The UUID of the cluster
+ :param cluster_uuid str: The UUID of the cluster or namespace:cluster
:param kdu_instance str: The unique name of the KDU instance
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
# call internal function
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
return await self._status_kdu(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
show_error_log=True,
return_text=True,
async def synchronize_repos(self, cluster_uuid: str):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("syncronize repos for cluster helm-id: {}",)
try:
update_repos_timeout = (
async def _status_kdu(
self,
- cluster_uuid: str,
+ cluster_id: str,
kdu_instance: str,
show_error_log: bool = False,
return_text: bool = False,
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
async def _store_status(
self,
- cluster_uuid: str,
+ cluster_id: str,
operation: str,
kdu_instance: str,
check_every: float = 10,
try:
await asyncio.sleep(check_every)
detailed_status = await self._status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance,
+ cluster_id=cluster_id, kdu_instance=kdu_instance,
return_text=False
)
status = detailed_status.get("info").get("Description")
if run_once:
return
- async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
+ async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
status = await self._status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
+ cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
)
# extract info.status.resources-> str
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
if params and len(params) > 0:
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)