- @staticmethod
- def _get_deep(dictionary: dict, members: tuple):
- target = dictionary
- value = None
- try:
- for m in members:
- value = target.get(m)
- if not value:
- return None
- else:
- target = value
- except Exception as e:
- pass
- return value
-
- # find key:value in several lines
- @staticmethod
- def _find_in_lines(p_lines: list, p_key: str) -> str:
- for line in p_lines:
- try:
- if line.startswith(p_key + ':'):
- parts = line.split(':')
- the_value = parts[1].strip()
- return the_value
- except Exception as e:
- # ignore it
- pass
- return None
-
- # params for use in -f file
- # returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
-
- if params and len(params) > 0:
- kube_dir, helm_dir, config_filename, cluster_dir = \
- self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
-
- def get_random_number():
- r = random.randrange(start=1, stop=99999999)
- s = str(r)
- while len(s) < 10:
- s = '0' + s
- return s
-
- params2 = dict()
- for key in params:
- value = params.get(key)
- if '!!yaml' in str(value):
- value = yaml.load(value[7:])
- params2[key] = value
-
- values_file = get_random_number() + '.yaml'
- with open(values_file, 'w') as stream:
- yaml.dump(params2, stream, indent=4, default_flow_style=False)
-
- return '-f {}'.format(values_file), values_file
-
- return '', None
-
- # params for use in --set option
- @staticmethod
- def _params_to_set_option(params: dict) -> str:
- params_str = ''
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += '--set '
- start = False
- else:
- params_str += ','
- params_str += '{}={}'.format(key, value)
- return params_str
-
- @staticmethod
- def _output_to_lines(output: str) -> list:
- output_lines = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.strip()
- if len(line) > 0:
- output_lines.append(line)
- return output_lines
-
- @staticmethod
- def _output_to_table(output: str) -> list:
- output_table = list()
- lines = output.splitlines(keepends=False)
- for line in lines:
- line = line.replace('\t', ' ')
- line_list = list()
- output_table.append(line_list)
- cells = line.split(sep=' ')
- for cell in cells:
- cell = cell.strip()
- if len(cell) > 0:
- line_list.append(cell)
- return output_table
-
- def _get_paths(self, cluster_name: str, create_if_not_exist: bool = False) -> (str, str, str, str):
- """
- Returns kube and helm directories
-
- :param cluster_name:
- :param create_if_not_exist:
- :return: kube, helm directories, config filename and cluster dir.
- Raises exception if not exist and cannot create
- """
-
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + '/' + cluster_name
- if create_if_not_exist and not os.path.exists(cluster_dir):
- self.log.debug('Creating dir {}'.format(cluster_dir))
- os.makedirs(cluster_dir)
- if not os.path.exists(cluster_dir):
- msg = 'Base cluster dir {} does not exist'.format(cluster_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # kube dir
- kube_dir = cluster_dir + '/' + '.kube'
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug('Creating dir {}'.format(kube_dir))
- os.makedirs(kube_dir)
- if not os.path.exists(kube_dir):
- msg = 'Kube config dir {} does not exist'.format(kube_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- # helm home dir
- helm_dir = cluster_dir + '/' + '.helm'
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug('Creating dir {}'.format(helm_dir))
- os.makedirs(helm_dir)
- if not os.path.exists(helm_dir):
- msg = 'Helm config dir {} does not exist'.format(helm_dir)
- self.log.error(msg)
- raise K8sException(msg)
-
- config_filename = kube_dir + '/config'
- return kube_dir, helm_dir, config_filename, cluster_dir
-
- @staticmethod
- def _remove_multiple_spaces(str):
- str = str.strip()
- while ' ' in str:
- str = str.replace(' ', ' ')
- return str
-
- def _local_exec(
- self,
- command: str
- ) -> (str, int):
- command = K8sHelmConnector._remove_multiple_spaces(command)
- self.log.debug('Executing sync local command: {}'.format(command))
- # raise exception if fails
- output = ''
- try:
- output = subprocess.check_output(command, shell=True, universal_newlines=True)
- return_code = 0
- self.log.debug(output)
- except Exception as e:
- return_code = 1
-
- return output, return_code
-
- async def _local_async_exec(
- self,
- command: str,
- raise_exception_on_error: bool = False,
- show_error_log: bool = True,
- encode_utf8: bool = False
- ) -> (str, int):