else:
self.log.info("Helm client already initialized")
+ # sync fs with local data
+ self.fs.reverse_sync(from_path=cluster_id)
+
self.log.info("Cluster {} initialized".format(cluster_id))
return cluster_uuid, n2vc_installed_sw
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# helm repo update
command = "{} --kubeconfig={} --home={} repo update".format(
self._helm_command, config_filename, helm_dir
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(command=command, raise_exception_on_error=True)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def repo_list(self, cluster_uuid: str) -> list:
"""
Get the list of registered repositories
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
self._helm_command, config_filename, helm_dir
)
else:
return []
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def repo_remove(self, cluster_uuid: str, name: str):
"""
Remove a repository from OSM
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} repo remove {}".format(
self._helm_command, config_filename, helm_dir, name
)
await self._local_async_exec(command=command, raise_exception_on_error=True)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
async def reset(
self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
) -> bool:
namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
- self.log.debug(
- "Resetting K8s environment. cluster uuid: {}".format(cluster_id)
- )
+ self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
+ .format(cluster_id, uninstall_sw))
# get kube and helm directories
_kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
cluster_name=cluster_id, create_if_not_exist=False
)
- # uninstall releases if needed
- releases = await self.instances_list(cluster_uuid=cluster_uuid)
- if len(releases) > 0:
- if force:
- for r in releases:
- try:
- kdu_instance = r.get("Name")
- chart = r.get("Chart")
- self.log.debug(
- "Uninstalling {} -> {}".format(chart, kdu_instance)
- )
- await self.uninstall(
- cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
- )
- except Exception as e:
- self.log.error(
- "Error uninstalling release {}: {}".format(kdu_instance, e)
- )
- else:
- msg = (
- "Cluster has releases and not force. Cannot reset K8s "
- "environment. Cluster uuid: {}"
- ).format(cluster_id)
- self.log.error(msg)
- raise K8sException(msg)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ # uninstall releases if needed.
+ if uninstall_sw:
+ releases = await self.instances_list(cluster_uuid=cluster_uuid)
+ if len(releases) > 0:
+ if force:
+ for r in releases:
+ try:
+ kdu_instance = r.get("Name")
+ chart = r.get("Chart")
+ self.log.debug(
+ "Uninstalling {} -> {}".format(chart, kdu_instance)
+ )
+ await self.uninstall(
+ cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
+ )
+ except Exception as e:
+ self.log.error(
+ "Error uninstalling release {}: {}".format(kdu_instance, e)
+ )
+ else:
+ msg = (
+ "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
+ ).format(cluster_id)
+ self.log.warn(msg)
+ uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
if uninstall_sw:
self.log.debug("namespace not found")
# delete cluster directory
+ self.log.debug("Removing directory {}".format(cluster_id))
+ self.fs.file_delete(cluster_id, ignore_non_exist=True)
+ # Remove also local directorio if still exist
direct = self.fs.path + "/" + cluster_id
- self.log.debug("Removing directory {}".format(direct))
shutil.rmtree(direct, ignore_errors=True)
return True
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.log.debug("sync cluster_id: {}".format(_cluster_dir))
+ self.fs.sync(from_path=cluster_id)
+
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
self.log.debug("Returning kdu_instance {}".format(kdu_instance))
return kdu_instance
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} list --output yaml".format(
self._helm_command, config_filename, helm_dir
)
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
# params to str
# params_str = K8sHelmConnector._params_to_set_option(params)
params_str, file_to_delete = self._params_to_file_option(
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
# return new revision number
instance = await self.get_instance_info(
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
self._helm_command, config_filename, helm_dir, kdu_instance, revision
)
self.log.error(msg)
raise K8sException(msg)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
# return new revision number
instance = await self.get_instance_info(
cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
cluster_name=cluster_id, create_if_not_exist=True
)
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
command = "{} --kubeconfig={} --home={} delete --purge {}".format(
self._helm_command, config_filename, helm_dir, kdu_instance
)
command=command, raise_exception_on_error=True
)
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
return self._output_to_table(output)
async def exec_primitive(
return_text=True,
)
+ async def get_services(self,
+ cluster_uuid: str,
+ kdu_instance: str,
+ namespace: str) -> list:
+
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ self.log.debug(
+ "get_services: cluster_uuid: {}, kdu_instance: {}".format(
+ cluster_uuid, kdu_instance
+ )
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ status = await self._status_kdu(
+ cluster_id, kdu_instance, return_text=False
+ )
+
+ service_names = self._parse_helm_status_service_info(status)
+ service_list = []
+ for service in service_names:
+ service = await self.get_service(cluster_uuid, service, namespace)
+ service_list.append(service)
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return service_list
+
+ async def get_service(self,
+ cluster_uuid: str,
+ service_name: str,
+ namespace: str) -> object:
+
+ self.log.debug(
+ "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
+ service_name, namespace, cluster_uuid)
+ )
+
+ # get paths
+ _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
+ _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
+ cluster_name=cluster_id, create_if_not_exist=True
+ )
+
+ # sync local dir
+ self.fs.sync(from_path=cluster_id)
+
+ command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
+ self.kubectl_command, config_filename, namespace, service_name
+ )
+
+ output, _rc = await self._local_async_exec(
+ command=command, raise_exception_on_error=True
+ )
+
+ data = yaml.load(output, Loader=yaml.SafeLoader)
+
+ service = {
+ "name": service_name,
+ "type": self._get_deep(data, ("spec", "type")),
+ "ports": self._get_deep(data, ("spec", "ports")),
+ "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
+ }
+ if service["type"] == "LoadBalancer":
+ ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
+ ip_list = [elem["ip"] for elem in ip_map_list]
+ service["external_ip"] = ip_list
+
+ # sync fs
+ self.fs.reverse_sync(from_path=cluster_id)
+
+ return service
+
async def synchronize_repos(self, cluster_uuid: str):
_, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
self.log.debug("Task cancelled")
return
except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)))
+ self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
pass
finally:
if run_once:
return ready
+ def _parse_helm_status_service_info(self, status):
+
+ # extract info.status.resources-> str
+ # format:
+ # ==> v1/Deployment
+ # NAME READY UP-TO-DATE AVAILABLE AGE
+ # halting-horse-mongodb 0/1 1 0 0s
+ # halting-petit-mongodb 1/1 1 0 0s
+ # blank line
+ resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
+
+ service_list = []
+ first_line_skipped = service_found = False
+ for line in resources:
+ if not service_found:
+ if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
+ service_found = True
+ continue
+ else:
+ if len(line) >= 2 and line[0] == "==>":
+ service_found = first_line_skipped = False
+ continue
+ if not line:
+ continue
+ if not first_line_skipped:
+ first_line_skipped = True
+ continue
+ service_list.append(line[0])
+
+ return service_list
+
@staticmethod
def _get_deep(dictionary: dict, members: tuple):
target = dictionary