-
- return output
-
- async def _status_kdu(
- self,
- cluster_id: str,
- kdu_instance: str,
- show_error_log: bool = False,
- return_text: bool = False,
- ):
-
- self.log.debug("status of kdu_instance {}".format(kdu_instance))
-
- # config filename
- _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
- self._helm_command, config_filename, helm_dir, kdu_instance
- )
-
- output, rc = await self._local_async_exec(
- command=command,
- raise_exception_on_error=True,
- show_error_log=show_error_log,
- )
-
- if return_text:
- return str(output)
-
- if rc != 0:
- return None
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- # remove field 'notes'
- try:
- del data.get("info").get("status")["notes"]
- except KeyError:
- pass
-
- # parse field 'resources'
- try:
- resources = str(data.get("info").get("status").get("resources"))
- resource_table = self._output_to_table(resources)
- data.get("info").get("status")["resources"] = resource_table
- except Exception:
- pass
-
- return data
-
- async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
- instances = await self.instances_list(cluster_uuid=cluster_uuid)
- for instance in instances:
- if instance.get("Name") == kdu_instance:
- return instance
- self.log.debug("Instance {} not found".format(kdu_instance))
- return None
-
- @staticmethod
- def _generate_release_name(chart_name: str):
- # check embeded chart (file or dir)
- if chart_name.startswith("/"):
- # extract file or directory name
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
- # check URL
- elif "://" in chart_name:
- # extract last portion of URL
- chart_name = chart_name[chart_name.rfind("/") + 1 :]
-
- name = ""
- for c in chart_name:
- if c.isalpha() or c.isnumeric():
- name += c
- else:
- name += "-"
- if len(name) > 35:
- name = name[0:35]
-
- # if does not start with alpha character, prefix 'a'
- if not name[0].isalpha():
- name = "a" + name
-
- name += "-"
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- s = s.rjust(10, "0")
- return s
-
- name = name + get_random_number()
- return name.lower()
-
- async def _store_status(
- self,
- cluster_id: str,
- operation: str,
- kdu_instance: str,
- check_every: float = 10,
- db_dict: dict = None,
- run_once: bool = False,
- ):
- while True:
- try:
- await asyncio.sleep(check_every)
- detailed_status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance,
- return_text=False
- )
- status = detailed_status.get("info").get("Description")
- self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
- # write status to db
- result = await self.write_app_status_to_db(
- db_dict=db_dict,
- status=str(status),
- detailed_status=str(detailed_status),
- operation=operation,
- )
- if not result:
- self.log.info("Error writing in database. Task exiting...")
- return
- except asyncio.CancelledError:
- self.log.debug("Task cancelled")
- return
- except Exception as e:
- self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
- pass
- finally:
- if run_once:
- return
-
- async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
-
- status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
- )
-
- # extract info.status.resources-> str
- # format:
- # ==> v1/Deployment
- # NAME READY UP-TO-DATE AVAILABLE AGE
- # halting-horse-mongodb 0/1 1 0 0s
- # halting-petit-mongodb 1/1 1 0 0s
- # blank line
- resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
-
- # convert to table
- resources = K8sHelmConnector._output_to_table(resources)
-
- num_lines = len(resources)
- index = 0
- while index < num_lines:
- try:
- line1 = resources[index]
- index += 1
- # find '==>' in column 0
- if line1[0] == "==>":
- line2 = resources[index]
- index += 1
- # find READY in column 1
- if line2[1] == "READY":
- # read next lines
- line3 = resources[index]
- index += 1
- while len(line3) > 1 and index < num_lines:
- ready_value = line3[1]
- parts = ready_value.split(sep="/")
- current = int(parts[0])
- total = int(parts[1])
- if current < total:
- self.log.debug("NOT READY:\n {}".format(line3))
- ready = False
- line3 = resources[index]
- index += 1
-
- except Exception:
- pass
-
- return ready
-
- def _parse_helm_status_service_info(self, status):
-
- # extract info.status.resources-> str
- # format:
- # ==> v1/Deployment
- # NAME READY UP-TO-DATE AVAILABLE AGE
- # halting-horse-mongodb 0/1 1 0 0s
- # halting-petit-mongodb 1/1 1 0 0s
- # blank line
- resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
-
- service_list = []
- first_line_skipped = service_found = False
- for line in resources:
- if not service_found:
- if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
- service_found = True
- continue
- else:
- if len(line) >= 2 and line[0] == "==>":
- service_found = first_line_skipped = False
- continue
- if not line:
- continue
- if not first_line_skipped:
- first_line_skipped = True
- continue
- service_list.append(line[0])
-
- return service_list
-
- @staticmethod
- def _get_deep(dictionary: dict, members: tuple):
- target = dictionary
- value = None
- try:
- for m in members:
- value = target.get(m)
- if not value:
- return None
- else:
- target = value
- except Exception:
- pass
- return value
-
- # find key:value in several lines
- @staticmethod
- def _find_in_lines(p_lines: list, p_key: str) -> str:
- for line in p_lines:
- try:
- if line.startswith(p_key + ":"):
- parts = line.split(":")
- the_value = parts[1].strip()
- return the_value
- except Exception:
- # ignore it
- pass
- return None
-
- # params for use in -f file
- # returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
- if params and len(params) > 0:
- self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- while len(s) < 10:
- s = "0" + s
- return s
-
- params2 = dict()
- for key in params:
- value = params.get(key)
- if "!!yaml" in str(value):
- value = yaml.load(value[7:])
- params2[key] = value
-
- values_file = get_random_number() + ".yaml"
- with open(values_file, "w") as stream:
- yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
- return "-f {}".format(values_file), values_file
-
- return "", None
-
- # params for use in --set option
- @staticmethod
- def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
-
- @staticmethod
- def _output_to_lines(output: str) -> list:
- output_lines = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.strip()
- if len(line) > 0:
- output_lines.append(line)
- return output_lines
-
- @staticmethod
- def _output_to_table(output: str) -> list:
- output_table = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.replace("\t", " ")
- line_list = list()
- output_table.append(line_list)
- cells = line.split(sep=" ")
- for cell in cells:
- cell = cell.strip()
- if len(cell) > 0:
- line_list.append(cell)
- return output_table
-
- def _get_paths(
- self, cluster_name: str, create_if_not_exist: bool = False
- ) -> (str, str, str, str):
- """
- Returns kube and helm directories
-
- :param cluster_name:
- :param create_if_not_exist:
- :return: kube, helm directories, config filename and cluster dir.
- Raises exception if not exist and cannot create
- """
-
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + "/" + cluster_name
- if create_if_not_exist and not os.path.exists(cluster_dir):
- self.log.debug("Creating dir {}".format(cluster_dir))
- os.makedirs(cluster_dir)
- if not os.path.exists(cluster_dir):
- msg = "Base cluster dir {} does not exist".format(cluster_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # kube dir
- kube_dir = cluster_dir + "/" + ".kube"
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug("Creating dir {}".format(kube_dir))
- os.makedirs(kube_dir)
- if not os.path.exists(kube_dir):
- msg = "Kube config dir {} does not exist".format(kube_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # helm home dir
- helm_dir = cluster_dir + "/" + ".helm"
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug("Creating dir {}".format(helm_dir))
- os.makedirs(helm_dir)
- if not os.path.exists(helm_dir):
- msg = "Helm config dir {} does not exist".format(helm_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- config_filename = kube_dir + "/config"
- return kube_dir, helm_dir, config_filename, cluster_dir
-
- @staticmethod
- def _remove_multiple_spaces(strobj):
- strobj = strobj.strip()
- while " " in strobj:
- strobj = strobj.replace(" ", " ")
- return strobj
-
- def _local_exec(self, command: str) -> (str, int):
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug("Executing sync local command: {}".format(command))
- # raise exception if fails
- output = ""
- try:
- output = subprocess.check_output(
- command, shell=True, universal_newlines=True
- )
- return_code = 0
- self.log.debug(output)
- except Exception:
- return_code = 1
-
- return output, return_code
-
- async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True,
- encode_utf8: bool = False,
- ) -> (str, int):
-
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug("Executing async local command: {}".format(command))
-
- # split command
- command = command.split(sep=" ")
-
- try:
- process = await asyncio.create_subprocess_exec(
- *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
- )
-
- # wait for command terminate
- stdout, stderr = await process.communicate()
-
- return_code = process.returncode
-
- output = ""
- if stdout:
- output = stdout.decode("utf-8").strip()
- # output = stdout.decode()
- if stderr:
- output = stderr.decode("utf-8").strip()
- # output = stderr.decode()
-
- if return_code != 0 and show_error_log:
- self.log.debug(
- "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
- )
- else:
- self.log.debug("Return code: {}".format(return_code))
-
- if raise_exception_on_error and return_code != 0:
- raise K8sException(output)
-
- if encode_utf8:
- output = output.encode("utf-8").strip()
- output = str(output).replace("\\n", "\n")
-
- return output, return_code
-
- except asyncio.CancelledError:
- raise
- except K8sException:
- raise
- except Exception as e:
- msg = "Exception executing command: {} -> {}".format(command, e)
- self.log.error(msg)
- if raise_exception_on_error:
- raise K8sException(e) from e
- else:
- return "", -1
-
- def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
- # self.log.debug('Checking if file {} exists...'.format(filename))
- if os.path.exists(filename):
- return True
- else:
- msg = "File {} does not exist".format(filename)
- if exception_if_not_exists:
- # self.log.error(msg)
- raise K8sException(msg)