2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.k8s_conn
import K8sConnector
36 class K8sHelmConnector(K8sConnector
):
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
48 kubectl_command
: str = '/usr/bin/kubectl',
49 helm_command
: str = '/usr/bin/helm',
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
60 :param on_update_db: callback called when k8s connector updates database
64 K8sConnector
.__init
__(
68 on_update_db
=on_update_db
71 self
.info('Initializing K8S Helm connector')
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 self
.info('K8S Helm connector initialized')
92 namespace
: str = 'kube-system',
93 reuse_cluster_uuid
=None
96 cluster_uuid
= reuse_cluster_uuid
98 cluster_uuid
= str(uuid4())
100 self
.debug('Initializing K8S environment. namespace: {}'.format(namespace
))
102 # create config filename
103 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
104 f
= open(config_filename
, "w")
108 # check if tiller pod is up in cluster
109 command
= '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self
.kubectl_command
, config_filename
, namespace
)
111 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
113 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
115 # find 'tiller' pod in all pods
116 already_initialized
= False
118 for row
in output_table
:
119 if row
[0].startswith('tiller-deploy'):
120 already_initialized
= True
122 except Exception as e
:
126 n2vc_installed_sw
= False
127 if not already_initialized
:
128 self
.info('Initializing helm in client and server: {}'.format(cluster_uuid
))
129 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
131 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
132 n2vc_installed_sw
= True
134 # check client helm installation
135 check_file
= helm_dir
+ '/repository/repositories.yaml'
136 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
137 self
.info('Initializing helm in client: {}'.format(cluster_uuid
))
138 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
140 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
142 self
.info('Helm client already initialized')
144 self
.info('Cluster initialized {}'.format(cluster_uuid
))
146 return cluster_uuid
, n2vc_installed_sw
153 repo_type
: str = 'chart'
156 self
.debug('adding {} repository {}. URL: {}'.format(repo_type
, name
, url
))
159 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
162 command
= '{} --kubeconfig={} --home={} repo update'.format(self
._helm
_command
, config_filename
, helm_dir
)
163 self
.debug('updating repo: {}'.format(command
))
164 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
166 # helm repo add name url
167 command
= '{} --kubeconfig={} --home={} repo add {} {}'\
168 .format(self
._helm
_command
, config_filename
, helm_dir
, name
, url
)
169 self
.debug('adding repo: {}'.format(command
))
170 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
177 Get the list of registered repositories
179 :return: list of registered repositories: [ (name, url) .... ]
182 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
185 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
187 command
= '{} --kubeconfig={} --home={} repo list --output yaml'.format(self
._helm
_command
, config_filename
, helm_dir
)
189 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
190 if output
and len(output
) > 0:
191 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
195 async def repo_remove(
201 Remove a repository from OSM
203 :param cluster_uuid: the cluster
204 :param name: repo name in OSM
205 :return: True if successful
208 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
211 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
213 command
= '{} --kubeconfig={} --home={} repo remove {}'\
214 .format(self
._helm
_command
, config_filename
, helm_dir
, name
)
216 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
222 uninstall_sw
: bool = False
225 self
.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid
))
227 # get kube and helm directories
228 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=False)
230 # uninstall releases if needed
231 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
232 if len(releases
) > 0:
236 kdu_instance
= r
.get('Name')
237 chart
= r
.get('Chart')
238 self
.debug('Uninstalling {} -> {}'.format(chart
, kdu_instance
))
239 await self
.uninstall(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
240 except Exception as e
:
241 self
.error('Error uninstalling release {}: {}'.format(kdu_instance
, e
))
243 msg
= 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
244 .format(cluster_uuid
)
250 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
252 # find namespace for tiller pod
253 command
= '{} --kubeconfig={} get deployments --all-namespaces'\
254 .format(self
.kubectl_command
, config_filename
)
255 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
256 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
258 for r
in output_table
:
260 if 'tiller-deploy' in r
[1]:
263 except Exception as e
:
266 msg
= 'Tiller deployment not found in cluster {}'.format(cluster_uuid
)
268 # raise Exception(msg)
270 self
.debug('namespace for tiller: {}'.format(namespace
))
272 force_str
= '--force'
275 # delete tiller deployment
276 self
.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid
, namespace
))
277 command
= '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
278 .format(self
.kubectl_command
, namespace
, config_filename
, force_str
)
279 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
281 # uninstall tiller from cluster
282 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
283 command
= '{} --kubeconfig={} --home={} reset'\
284 .format(self
._helm
_command
, config_filename
, helm_dir
)
285 self
.debug('resetting: {}'.format(command
))
286 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
288 self
.debug('namespace not found')
290 # delete cluster directory
291 dir = self
.fs
.path
+ '/' + cluster_uuid
292 self
.debug('Removing directory {}'.format(dir))
293 shutil
.rmtree(dir, ignore_errors
=True)
302 timeout
: float = 300,
307 self
.debug('installing {} in cluster {}'.format(kdu_model
, cluster_uuid
))
310 end
= start
+ timeout
313 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
316 params_str
= K8sHelmConnector
._params
_to
_set
_option
(params
)
320 timeout_str
= '--timeout {}'.format(timeout
)
325 atomic_str
= '--atomic'
330 parts
= kdu_model
.split(sep
=':')
332 version_str
= '--version {}'.format(parts
[1])
335 # generate a name for the releas. Then, check if already exists
337 while kdu_instance
is None:
338 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
340 result
= await self
._status
_kdu
(
341 cluster_uuid
=cluster_uuid
,
342 kdu_instance
=kdu_instance
,
345 if result
is not None:
346 # instance already exists: generate a new one
352 command
= '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
353 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
354 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
355 self
.debug('installing: {}'.format(command
))
358 # exec helm in a task
359 exec_task
= asyncio
.ensure_future(
360 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
362 # write status in another task
363 status_task
= asyncio
.ensure_future(
364 coro_or_future
=self
._store
_status
(
365 cluster_uuid
=cluster_uuid
,
366 kdu_instance
=kdu_instance
,
373 # wait for execution task
374 await asyncio
.wait([exec_task
])
379 output
, rc
= exec_task
.result()
383 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
386 await self
._store
_status
(
387 cluster_uuid
=cluster_uuid
,
388 kdu_instance
=kdu_instance
,
396 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
400 self
.debug('Returning kdu_instance {}'.format(kdu_instance
))
403 async def instances_list(
408 returns a list of deployed releases in a cluster
410 :param cluster_uuid: the cluster
414 self
.debug('list releases for cluster {}'.format(cluster_uuid
))
417 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
419 command
= '{} --kubeconfig={} --home={} list --output yaml'\
420 .format(self
._helm
_command
, config_filename
, helm_dir
)
422 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
424 if output
and len(output
) > 0:
425 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get('Releases')
433 kdu_model
: str = None,
435 timeout
: float = 300,
440 self
.debug('upgrading {} in cluster {}'.format(kdu_model
, cluster_uuid
))
443 end
= start
+ timeout
446 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
449 params_str
= K8sHelmConnector
._params
_to
_set
_option
(params
)
453 timeout_str
= '--timeout {}'.format(timeout
)
458 atomic_str
= '--atomic'
463 parts
= kdu_model
.split(sep
=':')
465 version_str
= '--version {}'.format(parts
[1])
469 command
= '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
470 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
471 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
472 self
.debug('upgrading: {}'.format(command
))
476 # exec helm in a task
477 exec_task
= asyncio
.ensure_future(
478 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
480 # write status in another task
481 status_task
= asyncio
.ensure_future(
482 coro_or_future
=self
._store
_status
(
483 cluster_uuid
=cluster_uuid
,
484 kdu_instance
=kdu_instance
,
491 # wait for execution task
492 await asyncio
.wait([ exec_task
])
496 output
, rc
= exec_task
.result()
500 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
503 await self
._store
_status
(
504 cluster_uuid
=cluster_uuid
,
505 kdu_instance
=kdu_instance
,
513 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
517 # return new revision number
518 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
520 revision
= int(instance
.get('Revision'))
521 self
.debug('New revision: {}'.format(revision
))
534 self
.debug('rollback kdu_instance {} to revision {} from cluster {}'
535 .format(kdu_instance
, revision
, cluster_uuid
))
538 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
540 command
= '{} rollback --kubeconfig={} --home={} {} {} --wait'\
541 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
)
543 # exec helm in a task
544 exec_task
= asyncio
.ensure_future(
545 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
547 # write status in another task
548 status_task
= asyncio
.ensure_future(
549 coro_or_future
=self
._store
_status
(
550 cluster_uuid
=cluster_uuid
,
551 kdu_instance
=kdu_instance
,
553 operation
='rollback',
558 # wait for execution task
559 await asyncio
.wait([exec_task
])
564 output
, rc
= exec_task
.result()
567 await self
._store
_status
(
568 cluster_uuid
=cluster_uuid
,
569 kdu_instance
=kdu_instance
,
571 operation
='rollback',
577 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
581 # return new revision number
582 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
584 revision
= int(instance
.get('Revision'))
585 self
.debug('New revision: {}'.format(revision
))
596 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
597 after all _terminate-config-primitive_ of the VNF are invoked).
599 :param cluster_uuid: UUID of a K8s cluster known by OSM
600 :param kdu_instance: unique name for the KDU instance to be deleted
601 :return: True if successful
604 self
.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance
, cluster_uuid
))
607 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
609 command
= '{} --kubeconfig={} --home={} delete --purge {}'\
610 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
612 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
614 return self
._output
_to
_table
(output
)
616 async def inspect_kdu(
621 self
.debug('inspect kdu_model {}'.format(kdu_model
))
623 command
= '{} inspect values {}'\
624 .format(self
._helm
_command
, kdu_model
)
626 output
, rc
= await self
._local
_async
_exec
(command
=command
)
635 self
.debug('help kdu_model {}'.format(kdu_model
))
637 command
= '{} inspect readme {}'\
638 .format(self
._helm
_command
, kdu_model
)
640 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
644 async def status_kdu(
650 return await self
._status
_kdu
(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
, show_error_log
=True)
654 ##################################################################################################
655 ########################################## P R I V A T E #########################################
656 ##################################################################################################
659 async def _status_kdu(
663 show_error_log
: bool = False
666 self
.debug('status of kdu_instance {}'.format(kdu_instance
))
669 kube_dir
, helm_dir
, config_filename
= self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
671 command
= '{} --kubeconfig={} --home={} status {} --output yaml'\
672 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
674 output
, rc
= await self
._local
_async
_exec
(
676 raise_exception_on_error
=True,
677 show_error_log
=show_error_log
683 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
685 # remove field 'notes'
687 del data
.get('info').get('status')['notes']
691 # parse field 'resources'
693 resources
= str(data
.get('info').get('status').get('resources'))
694 resource_table
= self
._output
_to
_table
(resources
)
695 data
.get('info').get('status')['resources'] = resource_table
696 except Exception as e
:
702 async def get_instance_info(
707 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
708 for instance
in instances
:
709 if instance
.get('Name') == kdu_instance
:
711 self
.debug('Instance {} not found'.format(kdu_instance
))
715 def _generate_release_name(
720 if c
.isalpha() or c
.isnumeric():
727 # if does not start with alpha character, prefix 'a'
728 if not name
[0].isalpha():
733 def get_random_number():
734 r
= random
.randrange(start
=1, stop
=99999999)
740 name
= name
+ get_random_number()
743 async def _store_status(
748 check_every
: float = 10,
749 db_dict
: dict = None,
750 run_once
: bool = False
754 await asyncio
.sleep(check_every
)
755 detailed_status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
756 status
= detailed_status
.get('info').get('Description')
758 self
.debug('STATUS:\n{}'.format(status
))
759 self
.debug('DETAILED STATUS:\n{}'.format(detailed_status
))
762 result
= await self
.write_app_status_to_db(
765 detailed_status
=str(detailed_status
),
768 self
.info('Error writing in database. Task exiting...')
770 except asyncio
.CancelledError
:
771 self
.debug('Task cancelled')
773 except Exception as e
:
779 async def _is_install_completed(
785 status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
787 # extract info.status.resources-> str
790 # NAME READY UP-TO-DATE AVAILABLE AGE
791 # halting-horse-mongodb 0/1 1 0 0s
792 # halting-petit-mongodb 1/1 1 0 0s
794 resources
= K8sHelmConnector
._get
_deep
(status
, ('info', 'status', 'resources'))
797 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
799 num_lines
= len(resources
)
801 while index
< num_lines
:
803 line1
= resources
[index
]
805 # find '==>' in column 0
806 if line1
[0] == '==>':
807 line2
= resources
[index
]
809 # find READY in column 1
810 if line2
[1] == 'READY':
812 line3
= resources
[index
]
814 while len(line3
) > 1 and index
< num_lines
:
815 ready_value
= line3
[1]
816 parts
= ready_value
.split(sep
='/')
817 current
= int(parts
[0])
818 total
= int(parts
[1])
820 self
.debug('NOT READY:\n {}'.format(line3
))
822 line3
= resources
[index
]
825 except Exception as e
:
831 def _get_deep(dictionary
: dict, members
: tuple):
836 value
= target
.get(m
)
841 except Exception as e
:
845 # find key:value in several lines
847 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
850 if line
.startswith(p_key
+ ':'):
851 parts
= line
.split(':')
852 the_value
= parts
[1].strip()
854 except Exception as e
:
859 # params for use in --set option
861 def _params_to_set_option(params
: dict) -> str:
863 if params
and len(params
) > 0:
866 value
= params
.get(key
, None)
867 if value
is not None:
869 params_str
+= '--set '
873 params_str
+= '{}={}'.format(key
, value
)
877 def _output_to_lines(output
: str) -> list:
878 output_lines
= list()
879 lines
= output
.splitlines(keepends
=False)
883 output_lines
.append(line
)
887 def _output_to_table(output
: str) -> list:
888 output_table
= list()
889 lines
= output
.splitlines(keepends
=False)
891 line
= line
.replace('\t', ' ')
893 output_table
.append(line_list
)
894 cells
= line
.split(sep
=' ')
898 line_list
.append(cell
)
901 def _get_paths(self
, cluster_name
: str, create_if_not_exist
: bool = False) -> (str, str, str):
903 Returns kube and helm directories
906 :param create_if_not_exist:
907 :return: kube, helm directories and config filename. Raises exception if not exist and cannot create
911 if base
.endswith("/") or base
.endswith("\\"):
914 # base dir for cluster
915 cluster_dir
= base
+ '/' + cluster_name
916 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
917 self
.debug('Creating dir {}'.format(cluster_dir
))
918 os
.makedirs(cluster_dir
)
919 if not os
.path
.exists(cluster_dir
):
920 msg
= 'Base cluster dir {} does not exist'.format(cluster_dir
)
925 kube_dir
= cluster_dir
+ '/' + '.kube'
926 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
927 self
.debug('Creating dir {}'.format(kube_dir
))
928 os
.makedirs(kube_dir
)
929 if not os
.path
.exists(kube_dir
):
930 msg
= 'Kube config dir {} does not exist'.format(kube_dir
)
935 helm_dir
= cluster_dir
+ '/' + '.helm'
936 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
937 self
.debug('Creating dir {}'.format(helm_dir
))
938 os
.makedirs(helm_dir
)
939 if not os
.path
.exists(helm_dir
):
940 msg
= 'Helm config dir {} does not exist'.format(helm_dir
)
944 config_filename
= kube_dir
+ '/config'
945 return kube_dir
, helm_dir
, config_filename
948 def _remove_multiple_spaces(str):
951 str = str.replace(' ', ' ')
958 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
959 self
.debug('Executing sync local command: {}'.format(command
))
960 # raise exception if fails
963 output
= subprocess
.check_output(command
, shell
=True, universal_newlines
=True)
966 except Exception as e
:
969 return output
, return_code
971 async def _local_async_exec(
974 raise_exception_on_error
: bool = False,
975 show_error_log
: bool = False
978 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
979 self
.debug('Executing async local command: {}'.format(command
))
982 command
= command
.split(sep
=' ')
985 process
= await asyncio
.create_subprocess_exec(
987 stdout
=asyncio
.subprocess
.PIPE
,
988 stderr
=asyncio
.subprocess
.PIPE
991 # wait for command terminate
992 stdout
, stderr
= await process
.communicate()
994 return_code
= process
.returncode
998 output
= stdout
.decode('utf-8').strip()
1000 output
= stderr
.decode('utf-8').strip()
1002 if return_code
!= 0 and show_error_log
:
1003 self
.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code
, output
))
1005 self
.debug('Return code: {}'.format(return_code
))
1007 if raise_exception_on_error
and return_code
!= 0:
1008 raise Exception(output
)
1010 return output
, return_code
1012 except Exception as e
:
1013 msg
= 'Exception executing command: {} -> {}'.format(command
, e
)
1027 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1028 self
.debug('Executing sync remote ssh command: {}'.format(command
))
1030 ssh
= paramiko
.SSHClient()
1031 ssh
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
1032 ssh
.connect(hostname
=hostname
, username
=username
, password
=password
)
1033 ssh_stdin
, ssh_stdout
, ssh_stderr
= ssh
.exec_command(command
=command
, timeout
=timeout
)
1034 output
= ssh_stdout
.read().decode('utf-8')
1035 error
= ssh_stderr
.read().decode('utf-8')
1037 self
.error('ERROR: {}'.format(error
))
1041 output
= output
.replace('\\n', '\n')
1042 self
.debug('OUTPUT: {}'.format(output
))
1044 return output
, return_code
1046 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1047 self
.debug('Checking if file {} exists...'.format(filename
))
1048 if os
.path
.exists(filename
):
1051 msg
= 'File {} does not exist'.format(filename
)
1052 if exception_if_not_exists
:
1054 raise Exception(msg
)