- def _parse_helm_status_service_info(self, status):
-
- # extract info.status.resources-> str
- # format:
- # ==> v1/Deployment
- # NAME READY UP-TO-DATE AVAILABLE AGE
- # halting-horse-mongodb 0/1 1 0 0s
- # halting-petit-mongodb 1/1 1 0 0s
- # blank line
- resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
-
- service_list = []
- first_line_skipped = service_found = False
- for line in resources:
- if not service_found:
- if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
- service_found = True
- continue
- else:
- if len(line) >= 2 and line[0] == "==>":
- service_found = first_line_skipped = False
- continue
- if not line:
- continue
- if not first_line_skipped:
- first_line_skipped = True
- continue
- service_list.append(line[0])
-
- return service_list
-
- @staticmethod
- def _get_deep(dictionary: dict, members: tuple):
- target = dictionary
- value = None
- try:
- for m in members:
- value = target.get(m)
- if not value:
- return None
- else:
- target = value
- except Exception:
- pass
- return value
-
- # find key:value in several lines
- @staticmethod
- def _find_in_lines(p_lines: list, p_key: str) -> str:
- for line in p_lines:
- try:
- if line.startswith(p_key + ":"):
- parts = line.split(":")
- the_value = parts[1].strip()
- return the_value
- except Exception:
- # ignore it
- pass
- return None
-
- # params for use in -f file
- # returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
- if params and len(params) > 0:
- self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- while len(s) < 10:
- s = "0" + s
- return s
-
- params2 = dict()
- for key in params:
- value = params.get(key)
- if "!!yaml" in str(value):
- value = yaml.load(value[7:])
- params2[key] = value
-
- values_file = get_random_number() + ".yaml"
- with open(values_file, "w") as stream:
- yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
- return "-f {}".format(values_file), values_file
-
- return "", None
-
- # params for use in --set option
- @staticmethod
- def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
-
- @staticmethod
- def _output_to_lines(output: str) -> list:
- output_lines = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.strip()
- if len(line) > 0:
- output_lines.append(line)
- return output_lines
-
- @staticmethod
- def _output_to_table(output: str) -> list:
- output_table = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.replace("\t", " ")
- line_list = list()
- output_table.append(line_list)
- cells = line.split(sep=" ")
- for cell in cells:
- cell = cell.strip()
- if len(cell) > 0:
- line_list.append(cell)
- return output_table
-
- def _get_paths(
- self, cluster_name: str, create_if_not_exist: bool = False
- ) -> (str, str, str, str):
- """
- Returns kube and helm directories
-
- :param cluster_name:
- :param create_if_not_exist:
- :return: kube, helm directories, config filename and cluster dir.
- Raises exception if not exist and cannot create
- """
-
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + "/" + cluster_name
- if create_if_not_exist and not os.path.exists(cluster_dir):
- self.log.debug("Creating dir {}".format(cluster_dir))
- os.makedirs(cluster_dir)
- if not os.path.exists(cluster_dir):
- msg = "Base cluster dir {} does not exist".format(cluster_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # kube dir
- kube_dir = cluster_dir + "/" + ".kube"
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug("Creating dir {}".format(kube_dir))
- os.makedirs(kube_dir)
- if not os.path.exists(kube_dir):
- msg = "Kube config dir {} does not exist".format(kube_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # helm home dir
- helm_dir = cluster_dir + "/" + ".helm"
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug("Creating dir {}".format(helm_dir))
- os.makedirs(helm_dir)
- if not os.path.exists(helm_dir):
- msg = "Helm config dir {} does not exist".format(helm_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- config_filename = kube_dir + "/config"
- return kube_dir, helm_dir, config_filename, cluster_dir
-
- @staticmethod
- def _remove_multiple_spaces(strobj):
- strobj = strobj.strip()
- while " " in strobj:
- strobj = strobj.replace(" ", " ")
- return strobj
-
- def _local_exec(self, command: str) -> (str, int):
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code