################################### P U B L I C ####################################
####################################################################################
"""
+ service_account = "osm"
def __init__(
self,
self.log.info("K8S Helm connector initialized")
+ @staticmethod
+ def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
+ """
+ Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
+ cluster_id for backward compatibility
+ """
+ namespace, _, cluster_id = cluster_uuid.rpartition(':')
+ return namespace, cluster_id
+
async def init_env(
self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
) -> (str, bool):
(on error, an exception will be raised)
"""
- cluster_uuid = reuse_cluster_uuid
- if not cluster_uuid:
- cluster_uuid = str(uuid4())
+ if reuse_cluster_uuid:
+ namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
+ namespace = namespace_ or namespace
+ else:
+ cluster_id = str(uuid4())
+ cluster_uuid = "{}:{}".format(namespace, cluster_id)
- self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
+ self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
# create config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
- f = open(config_filename, "w")
- f.write(k8s_creds)
- f.close()
+ with open(config_filename, "w") as f:
+ f.write(k8s_creds)
# check if tiller pod is up in cluster
command = "{} --kubeconfig={} --namespace={} get deployments".format(
command=command, raise_exception_on_error=True
)
- output_table = K8sHelmConnector._output_to_table(output=output)
+ output_table = self._output_to_table(output=output)
# find 'tiller' pod in all pods
already_initialized = False
n2vc_installed_sw = False
if not already_initialized:
self.log.info(
- "Initializing helm in client and server: {}".format(cluster_uuid)
- )
- command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
- self._helm_command, config_filename, namespace, helm_dir
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True
+ "Initializing helm in client and server: {}".format(cluster_id)
)
+ command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
+ self.kubectl_command, config_filename, self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
+ "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
+ ).format(self.kubectl_command, config_filename, self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
+
+ command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
+ "init").format(self._helm_command, config_filename, namespace, helm_dir,
+ self.service_account)
+ _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
n2vc_installed_sw = True
else:
# check client helm installation
check_file = helm_dir + "/repository/repositories.yaml"
- if not self._check_file_exists(
- filename=check_file, exception_if_not_exists=False
- ):
- self.log.info("Initializing helm in client: {}".format(cluster_uuid))
+ if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
+ self.log.info("Initializing helm in client: {}".format(cluster_id))
command = (
"{} --kubeconfig={} --tiller-namespace={} "
"--home={} init --client-only"
else:
self.log.info("Helm client already initialized")
- self.log.info("Cluster initialized {}".format(cluster_uuid))
+ self.log.info("Cluster {} initialized".format(cluster_id))
return cluster_uuid, n2vc_installed_sw
async def repo_add(
self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
):
-
- self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
+ cluster_id, repo_type, name, url))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# helm repo update
:return: list of registered repositories: [ (name, url) .... ]
"""
- self.log.debug("list repositories for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
"""
Remove a repository from OSM
- :param cluster_uuid: the cluster
+ :param cluster_uuid: the cluster or 'namespace:cluster'
:param name: repo name in OSM
:return: True if successful
"""
- self.log.debug("list repositories for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list repositories for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} repo remove {}".format(
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
- self.log.debug(
- "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
- )
+ namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
+ .format(cluster_id, uninstall_sw))
# get kube and helm directories
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=False
+ cluster_name=cluster_id, create_if_not_exist=False
)
- # uninstall releases if needed
- releases = await self.instances_list(cluster_uuid=cluster_uuid)
- if len(releases) > 0:
- if force:
- for r in releases:
- try:
- kdu_instance = r.get("Name")
- chart = r.get("Chart")
- self.log.debug(
- "Uninstalling {} -> {}".format(chart, kdu_instance)
- )
- await self.uninstall(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- except Exception as e:
- self.log.error(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
- )
- else:
- msg = (
- "Cluster has releases and not force. Cannot reset K8s "
- "environment. Cluster uuid: {}"
- ).format(cluster_uuid)
- self.log.error(msg)
- raise K8sException(msg)
-
+ # uninstall releases if needed.
if uninstall_sw:
+ releases = await self.instances_list(cluster_uuid=cluster_uuid)
+ if len(releases) > 0:
+ if force:
+ for r in releases:
+ try:
+ kdu_instance = r.get("Name")
+ chart = r.get("Chart")
+ self.log.debug(
+ "Uninstalling {} -> {}".format(chart, kdu_instance)
+ )
+ await self.uninstall(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ except Exception as e:
+ self.log.error(
+ "Error uninstalling release {}: {}".format(kdu_instance, e)
+ )
+ else:
+ msg = (
+ "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
+ ).format(cluster_id)
+ self.log.warn(msg)
+ uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
-
- # find namespace for tiller pod
- command = "{} --kubeconfig={} get deployments --all-namespaces".format(
- self.kubectl_command, config_filename
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False
- )
- output_table = K8sHelmConnector._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if "tiller-deploy" in r[1]:
- namespace = r[0]
- break
- except Exception:
- pass
- else:
- msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
- self.log.error(msg)
-
- self.log.debug("namespace for tiller: {}".format(namespace))
+ if uninstall_sw:
- force_str = "--force"
+ self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
- if namespace:
- # delete tiller deployment
- self.log.debug(
- "Deleting tiller deployment for cluster {}, namespace {}".format(
- cluster_uuid, namespace
- )
+ if not namespace:
+ # find namespace for tiller pod
+ command = "{} --kubeconfig={} get deployments --all-namespaces".format(
+ self.kubectl_command, config_filename
)
- command = (
- "{} --namespace {} --kubeconfig={} {} delete deployment "
- "tiller-deploy"
- ).format(self.kubectl_command, namespace, config_filename, force_str)
- await self._local_async_exec(
+ output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=False
)
+ output_table = K8sHelmConnector._output_to_table(output=output)
+ namespace = None
+ for r in output_table:
+ try:
+ if "tiller-deploy" in r[1]:
+ namespace = r[0]
+ break
+ except Exception:
+ pass
+ else:
+ msg = "Tiller deployment not found in cluster {}".format(cluster_id)
+ self.log.error(msg)
+
+ self.log.debug("namespace for tiller: {}".format(namespace))
+ if namespace:
# uninstall tiller from cluster
self.log.debug(
- "Uninstalling tiller from cluster {}".format(cluster_uuid)
+ "Uninstalling tiller from cluster {}".format(cluster_id)
)
command = "{} --kubeconfig={} --home={} reset".format(
self._helm_command, config_filename, helm_dir
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True
)
+ # Delete clusterrolebinding and serviceaccount.
+ # Ignore if errors for backward compatibility
+ command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
+ "io/osm-tiller-cluster-rule").format(self.kubectl_command,
+ config_filename)
+ output, _rc = await self._local_async_exec(command=command,
+ raise_exception_on_error=False)
+ command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
+ format(self.kubectl_command, config_filename, self.service_account)
+ output, _rc = await self._local_async_exec(command=command,
+ raise_exception_on_error=False)
+
else:
self.log.debug("namespace not found")
# delete cluster directory
- direct = self.fs.path + "/" + cluster_uuid
+ direct = self.fs.path + "/" + cluster_id
self.log.debug("Removing directory {}".format(direct))
shutil.rmtree(direct, ignore_errors=True)
namespace: str = None,
):
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
- cluster_uuid=cluster_uuid, params=params
+ cluster_id=cluster_id, params=params
)
timeout_str = ""
kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
try:
result = await self._status_kdu(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
show_error_log=False,
)
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="install",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="install",
"""
returns a list of deployed releases in a cluster
- :param cluster_uuid: the cluster
+ :param cluster_uuid: the 'cluster' or 'namespace:cluster'
:return:
"""
- self.log.debug("list releases for cluster {}".format(cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("list releases for cluster {}".format(cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} list --output yaml".format(
db_dict: dict = None,
):
- self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
- cluster_uuid=cluster_uuid, params=params
+ cluster_id=cluster_id, params=params
)
timeout_str = ""
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="upgrade",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="upgrade",
self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
):
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"rollback kdu_instance {} to revision {} from cluster {}".format(
- kdu_instance, revision, cluster_uuid
+ kdu_instance, revision, cluster_id
)
)
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
# write status in another task
status_task = asyncio.ensure_future(
coro_or_future=self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="rollback",
# write final status
await self._store_status(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
db_dict=db_dict,
operation="rollback",
(this call would happen after all _terminate-config-primitive_ of the VNF
are invoked).
- :param cluster_uuid: UUID of a K8s cluster known by OSM
+ :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
:param kdu_instance: unique name for the KDU instance to be deleted
:return: True if successful
"""
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug(
"uninstall kdu_instance {} from cluster {}".format(
- kdu_instance, cluster_uuid
+ kdu_instance, cluster_id
)
)
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} delete --purge {}".format(
) -> str:
"""Exec primitive (Juju action)
- :param cluster_uuid str: The UUID of the cluster
+ :param cluster_uuid str: The UUID of the cluster or namespace:cluster
:param kdu_instance str: The unique name of the KDU instance
:param primitive_name: Name of action that will be executed
:param timeout: Timeout for action execution
async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
# call internal function
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
return await self._status_kdu(
- cluster_uuid=cluster_uuid,
+ cluster_id=cluster_id,
kdu_instance=kdu_instance,
show_error_log=True,
return_text=True,
)
+ async def get_services(self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ namespace: str) -> list:
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+ cluster_uuid, kdu_instance
+ )
+ )
+
+ status = await self._status_kdu(
+ cluster_id, kdu_instance, return_text=False
+ )
+
+ service_names = self._parse_helm_status_service_info(status)
+ service_list = []
+ for service in service_names:
+ service = await self.get_service(cluster_uuid, service, namespace)
+ service_list.append(service)
+
+ return service_list
+
+ async def get_service(self,
+ cluster_uuid: str,
+ service_name: str,
+ namespace: str) -> object:
+
+ self.log.debug(
+ "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+ service_name, namespace, cluster_uuid)
+ )
+
+ # get paths
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+ self.kubectl_command, config_filename, namespace, service_name
+ )
+
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ service = {
+ "name": service_name,
+ "type": self._get_deep(data, ("spec", "type")),
+ "ports": self._get_deep(data, ("spec", "ports")),
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ }
+ if service["type"] == "LoadBalancer":
+ ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+ ip_list = [elem["ip"] for elem in ip_map_list]
+ service["external_ip"] = ip_list
+
+ return service
+
async def synchronize_repos(self, cluster_uuid: str):
- self.log.debug("syncronize repos for cluster helm-id: {}",)
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug("syncronize repos for cluster helm-id: {}".format(cluster_id))
try:
update_repos_timeout = (
300 # max timeout to sync a single repos, more than this is too much
# elements that must be deleted
deleted_repo_list = []
added_repo_dict = {}
- self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
- self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
+ # self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
+ # self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
# obtain repos to add: registered by nbi but not added
repos_to_add = [
# delete repos: must delete first then add because there may be
# different repos with same name but
# different id and url
- self.log.debug("repos to delete: {}".format(repos_to_delete))
+ if repos_to_delete:
+ self.log.debug("repos to delete: {}".format(repos_to_delete))
for repo_id in repos_to_delete:
# try to delete repos
try:
deleted_repo_list.append(repo_id)
# add repos
- self.log.debug("repos to add: {}".format(repos_to_add))
+ if repos_to_add:
+ self.log.debug("repos to add: {}".format(repos_to_add))
for repo_id in repos_to_add:
# obtain the repo data from the db
# if there is an error getting the repo in the database we will
async def _status_kdu(
self,
- cluster_uuid: str,
+ cluster_id: str,
kdu_instance: str,
show_error_log: bool = False,
return_text: bool = False,
# config filename
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_uuid, create_if_not_exist=True
+ cluster_name=cluster_id, create_if_not_exist=True
)
command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
async def _store_status(
self,
- cluster_uuid: str,
+ cluster_id: str,
operation: str,
kdu_instance: str,
check_every: float = 10,
db_dict: dict = None,
run_once: bool = False,
):
+ previous_exception = None
while True:
try:
await asyncio.sleep(check_every)
- detailed_status = await self.status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ detailed_status = await self._status_kdu(
+ cluster_id=cluster_id, kdu_instance=kdu_instance,
+ return_text=False
)
status = detailed_status.get("info").get("Description")
- self.log.debug("STATUS:\n{}".format(status))
- self.log.debug("DETAILED STATUS:\n{}".format(detailed_status))
+ self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
# write status to db
result = await self.write_app_status_to_db(
db_dict=db_dict,
self.log.debug("Task cancelled")
return
except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)))
- pass
+ # log only once in the while loop
+ if str(previous_exception) != str(e):
+ self.log.debug("_store_status exception: {}".format(str(e)))
+ previous_exception = e
finally:
if run_once:
return
- async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
+ async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
status = await self._status_kdu(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
+ cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
)
# extract info.status.resources-> str
return ready
+ def _parse_helm_status_service_info(self, status):
+
+ # extract info.status.resources-> str
+ # format:
+ # ==> v1/Deployment
+ # NAME READY UP-TO-DATE AVAILABLE AGE
+ # halting-horse-mongodb 0/1 1 0 0s
+ # halting-petit-mongodb 1/1 1 0 0s
+ # blank line
+ resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
+
+ service_list = []
+ first_line_skipped = service_found = False
+ for line in resources:
+ if not service_found:
+ if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
+ service_found = True
+ continue
+ else:
+ if len(line) >= 2 and line[0] == "==>":
+ service_found = first_line_skipped = False
+ continue
+ if not line:
+ continue
+ if not first_line_skipped:
+ first_line_skipped = True
+ continue
+ service_list.append(line[0])
+
+ return service_list
+
@staticmethod
def _get_deep(dictionary: dict, members: tuple):
target = dictionary
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
if params and len(params) > 0:
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
+ self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
r = random.randrange(start=1, stop=99999999)