2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
30 from uuid
import uuid4
32 from n2vc
.k8s_conn
import K8sConnector
35 class K8sHelmConnector(K8sConnector
):
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
47 kubectl_command
: str = '/usr/bin/kubectl',
48 helm_command
: str = '/usr/bin/helm',
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
59 :param on_update_db: callback called when k8s connector updates database
63 K8sConnector
.__init
__(
67 on_update_db
=on_update_db
70 self
.info('Initializing K8S Helm connector')
72 # random numbers for release name generation
73 random
.seed(time
.time())
78 # exception if kubectl is not installed
79 self
.kubectl_command
= kubectl_command
80 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
82 # exception if helm is not installed
83 self
._helm
_command
= helm_command
84 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
86 self
.info('K8S Helm connector initialized')
91 namespace
: str = 'kube-system',
92 reuse_cluster_uuid
=None
95 cluster_uuid
= reuse_cluster_uuid
97 cluster_uuid
= str(uuid4())
99 self
.debug('Initializing K8S environment. namespace: {}'.format(namespace
))
101 # create config filename
102 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
103 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
104 f
= open(config_filename
, "w")
108 # check if tiller pod is up in cluster
109 command
= '{} --kubeconfig={} --namespace={} get deployments'\
110 .format(self
.kubectl_command
, config_filename
, namespace
)
111 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
113 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
115 # find 'tiller' pod in all pods
116 already_initialized
= False
118 for row
in output_table
:
119 if row
[0].startswith('tiller-deploy'):
120 already_initialized
= True
122 except Exception as e
:
126 n2vc_installed_sw
= False
127 if not already_initialized
:
128 self
.info('Initializing helm in client and server: {}'.format(cluster_uuid
))
129 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
130 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
131 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
132 n2vc_installed_sw
= True
134 # check client helm installation
135 check_file
= helm_dir
+ '/repository/repositories.yaml'
136 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
137 self
.info('Initializing helm in client: {}'.format(cluster_uuid
))
138 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
139 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
140 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
142 self
.info('Helm client already initialized')
144 self
.info('Cluster initialized {}'.format(cluster_uuid
))
146 return cluster_uuid
, n2vc_installed_sw
153 repo_type
: str = 'chart'
156 self
.debug('adding {} repository {}. URL: {}'.format(repo_type
, name
, url
))
159 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
160 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
163 command
= '{} --kubeconfig={} --home={} repo update'.format(self
._helm
_command
, config_filename
, helm_dir
)
164 self
.debug('updating repo: {}'.format(command
))
165 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
167 # helm repo add name url
168 command
= '{} --kubeconfig={} --home={} repo add {} {}'\
169 .format(self
._helm
_command
, config_filename
, helm_dir
, name
, url
)
170 self
.debug('adding repo: {}'.format(command
))
171 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
178 Get the list of registered repositories
180 :return: list of registered repositories: [ (name, url) .... ]
183 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
186 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
187 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
189 command
= '{} --kubeconfig={} --home={} repo list --output yaml'.format(self
._helm
_command
, config_filename
, helm_dir
)
191 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
192 if output
and len(output
) > 0:
193 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
197 async def repo_remove(
203 Remove a repository from OSM
205 :param cluster_uuid: the cluster
206 :param name: repo name in OSM
207 :return: True if successful
210 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
213 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
214 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
216 command
= '{} --kubeconfig={} --home={} repo remove {}'\
217 .format(self
._helm
_command
, config_filename
, helm_dir
, name
)
219 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
225 uninstall_sw
: bool = False
228 self
.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid
))
230 # get kube and helm directories
231 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
232 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=False)
234 # uninstall releases if needed
235 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
236 if len(releases
) > 0:
240 kdu_instance
= r
.get('Name')
241 chart
= r
.get('Chart')
242 self
.debug('Uninstalling {} -> {}'.format(chart
, kdu_instance
))
243 await self
.uninstall(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
244 except Exception as e
:
245 self
.error('Error uninstalling release {}: {}'.format(kdu_instance
, e
))
247 msg
= 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
248 .format(cluster_uuid
)
254 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
256 # find namespace for tiller pod
257 command
= '{} --kubeconfig={} get deployments --all-namespaces'\
258 .format(self
.kubectl_command
, config_filename
)
259 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
260 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
262 for r
in output_table
:
264 if 'tiller-deploy' in r
[1]:
267 except Exception as e
:
270 msg
= 'Tiller deployment not found in cluster {}'.format(cluster_uuid
)
272 # raise Exception(msg)
274 self
.debug('namespace for tiller: {}'.format(namespace
))
276 force_str
= '--force'
279 # delete tiller deployment
280 self
.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid
, namespace
))
281 command
= '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
282 .format(self
.kubectl_command
, namespace
, config_filename
, force_str
)
283 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
285 # uninstall tiller from cluster
286 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
287 command
= '{} --kubeconfig={} --home={} reset'\
288 .format(self
._helm
_command
, config_filename
, helm_dir
)
289 self
.debug('resetting: {}'.format(command
))
290 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
292 self
.debug('namespace not found')
294 # delete cluster directory
295 dir = self
.fs
.path
+ '/' + cluster_uuid
296 self
.debug('Removing directory {}'.format(dir))
297 shutil
.rmtree(dir, ignore_errors
=True)
306 timeout
: float = 300,
311 self
.debug('installing {} in cluster {}'.format(kdu_model
, cluster_uuid
))
314 end
= start
+ timeout
317 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
318 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
321 # params_str = K8sHelmConnector._params_to_set_option(params)
322 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
326 timeout_str
= '--timeout {}'.format(timeout
)
331 atomic_str
= '--atomic'
336 parts
= kdu_model
.split(sep
=':')
338 version_str
= '--version {}'.format(parts
[1])
341 # generate a name for the releas. Then, check if already exists
343 while kdu_instance
is None:
344 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
346 result
= await self
._status
_kdu
(
347 cluster_uuid
=cluster_uuid
,
348 kdu_instance
=kdu_instance
,
351 if result
is not None:
352 # instance already exists: generate a new one
358 command
= '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
359 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
360 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
361 self
.debug('installing: {}'.format(command
))
364 # exec helm in a task
365 exec_task
= asyncio
.ensure_future(
366 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
368 # write status in another task
369 status_task
= asyncio
.ensure_future(
370 coro_or_future
=self
._store
_status
(
371 cluster_uuid
=cluster_uuid
,
372 kdu_instance
=kdu_instance
,
379 # wait for execution task
380 await asyncio
.wait([exec_task
])
385 output
, rc
= exec_task
.result()
389 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
391 # remove temporal values yaml file
393 os
.remove(file_to_delete
)
396 await self
._store
_status
(
397 cluster_uuid
=cluster_uuid
,
398 kdu_instance
=kdu_instance
,
406 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
410 self
.debug('Returning kdu_instance {}'.format(kdu_instance
))
413 async def instances_list(
418 returns a list of deployed releases in a cluster
420 :param cluster_uuid: the cluster
424 self
.debug('list releases for cluster {}'.format(cluster_uuid
))
427 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
428 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
430 command
= '{} --kubeconfig={} --home={} list --output yaml'\
431 .format(self
._helm
_command
, config_filename
, helm_dir
)
433 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
435 if output
and len(output
) > 0:
436 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get('Releases')
444 kdu_model
: str = None,
446 timeout
: float = 300,
451 self
.debug('upgrading {} in cluster {}'.format(kdu_model
, cluster_uuid
))
454 end
= start
+ timeout
457 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
458 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
461 # params_str = K8sHelmConnector._params_to_set_option(params)
462 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
466 timeout_str
= '--timeout {}'.format(timeout
)
471 atomic_str
= '--atomic'
475 if kdu_model
and ':' in kdu_model
:
476 parts
= kdu_model
.split(sep
=':')
478 version_str
= '--version {}'.format(parts
[1])
482 command
= '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
483 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
484 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
485 self
.debug('upgrading: {}'.format(command
))
489 # exec helm in a task
490 exec_task
= asyncio
.ensure_future(
491 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
493 # write status in another task
494 status_task
= asyncio
.ensure_future(
495 coro_or_future
=self
._store
_status
(
496 cluster_uuid
=cluster_uuid
,
497 kdu_instance
=kdu_instance
,
504 # wait for execution task
505 await asyncio
.wait([ exec_task
])
509 output
, rc
= exec_task
.result()
513 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
515 # remove temporal values yaml file
517 os
.remove(file_to_delete
)
520 await self
._store
_status
(
521 cluster_uuid
=cluster_uuid
,
522 kdu_instance
=kdu_instance
,
530 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
534 # return new revision number
535 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
537 revision
= int(instance
.get('Revision'))
538 self
.debug('New revision: {}'.format(revision
))
551 self
.debug('rollback kdu_instance {} to revision {} from cluster {}'
552 .format(kdu_instance
, revision
, cluster_uuid
))
555 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
556 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
558 command
= '{} rollback --kubeconfig={} --home={} {} {} --wait'\
559 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
)
561 # exec helm in a task
562 exec_task
= asyncio
.ensure_future(
563 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
565 # write status in another task
566 status_task
= asyncio
.ensure_future(
567 coro_or_future
=self
._store
_status
(
568 cluster_uuid
=cluster_uuid
,
569 kdu_instance
=kdu_instance
,
571 operation
='rollback',
576 # wait for execution task
577 await asyncio
.wait([exec_task
])
582 output
, rc
= exec_task
.result()
585 await self
._store
_status
(
586 cluster_uuid
=cluster_uuid
,
587 kdu_instance
=kdu_instance
,
589 operation
='rollback',
595 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
599 # return new revision number
600 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
602 revision
= int(instance
.get('Revision'))
603 self
.debug('New revision: {}'.format(revision
))
614 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
615 after all _terminate-config-primitive_ of the VNF are invoked).
617 :param cluster_uuid: UUID of a K8s cluster known by OSM
618 :param kdu_instance: unique name for the KDU instance to be deleted
619 :return: True if successful
622 self
.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance
, cluster_uuid
))
625 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
626 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
628 command
= '{} --kubeconfig={} --home={} delete --purge {}'\
629 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
631 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
633 return self
._output
_to
_table
(output
)
635 async def inspect_kdu(
640 self
.debug('inspect kdu_model {}'.format(kdu_model
))
642 command
= '{} inspect values {}'\
643 .format(self
._helm
_command
, kdu_model
)
645 output
, rc
= await self
._local
_async
_exec
(command
=command
)
654 self
.debug('help kdu_model {}'.format(kdu_model
))
656 command
= '{} inspect readme {}'\
657 .format(self
._helm
_command
, kdu_model
)
659 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
663 async def status_kdu(
669 return await self
._status
_kdu
(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
, show_error_log
=True)
673 ##################################################################################################
674 ########################################## P R I V A T E #########################################
675 ##################################################################################################
678 async def _status_kdu(
682 show_error_log
: bool = False
685 self
.debug('status of kdu_instance {}'.format(kdu_instance
))
688 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
689 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
691 command
= '{} --kubeconfig={} --home={} status {} --output yaml'\
692 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
694 output
, rc
= await self
._local
_async
_exec
(
696 raise_exception_on_error
=True,
697 show_error_log
=show_error_log
703 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
705 # remove field 'notes'
707 del data
.get('info').get('status')['notes']
711 # parse field 'resources'
713 resources
= str(data
.get('info').get('status').get('resources'))
714 resource_table
= self
._output
_to
_table
(resources
)
715 data
.get('info').get('status')['resources'] = resource_table
716 except Exception as e
:
721 async def get_instance_info(
726 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
727 for instance
in instances
:
728 if instance
.get('Name') == kdu_instance
:
730 self
.debug('Instance {} not found'.format(kdu_instance
))
734 def _generate_release_name(
739 if c
.isalpha() or c
.isnumeric():
746 # if does not start with alpha character, prefix 'a'
747 if not name
[0].isalpha():
752 def get_random_number():
753 r
= random
.randrange(start
=1, stop
=99999999)
755 s
= s
.rjust(width
=10, fillchar
=' ')
758 name
= name
+ get_random_number()
761 async def _store_status(
766 check_every
: float = 10,
767 db_dict
: dict = None,
768 run_once
: bool = False
772 await asyncio
.sleep(check_every
)
773 detailed_status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
774 status
= detailed_status
.get('info').get('Description')
776 self
.debug('STATUS:\n{}'.format(status
))
777 self
.debug('DETAILED STATUS:\n{}'.format(detailed_status
))
780 result
= await self
.write_app_status_to_db(
783 detailed_status
=str(detailed_status
),
786 self
.info('Error writing in database. Task exiting...')
788 except asyncio
.CancelledError
:
789 self
.debug('Task cancelled')
791 except Exception as e
:
797 async def _is_install_completed(
803 status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
805 # extract info.status.resources-> str
808 # NAME READY UP-TO-DATE AVAILABLE AGE
809 # halting-horse-mongodb 0/1 1 0 0s
810 # halting-petit-mongodb 1/1 1 0 0s
812 resources
= K8sHelmConnector
._get
_deep
(status
, ('info', 'status', 'resources'))
815 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
817 num_lines
= len(resources
)
819 while index
< num_lines
:
821 line1
= resources
[index
]
823 # find '==>' in column 0
824 if line1
[0] == '==>':
825 line2
= resources
[index
]
827 # find READY in column 1
828 if line2
[1] == 'READY':
830 line3
= resources
[index
]
832 while len(line3
) > 1 and index
< num_lines
:
833 ready_value
= line3
[1]
834 parts
= ready_value
.split(sep
='/')
835 current
= int(parts
[0])
836 total
= int(parts
[1])
838 self
.debug('NOT READY:\n {}'.format(line3
))
840 line3
= resources
[index
]
843 except Exception as e
:
849 def _get_deep(dictionary
: dict, members
: tuple):
854 value
= target
.get(m
)
859 except Exception as e
:
863 # find key:value in several lines
865 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
868 if line
.startswith(p_key
+ ':'):
869 parts
= line
.split(':')
870 the_value
= parts
[1].strip()
872 except Exception as e
:
877 # params for use in -f file
878 # returns values file option and filename (in order to delete it at the end)
879 def _params_to_file_option(self
, cluster_uuid
: str, params
: dict) -> (str, str):
882 if params
and len(params
) > 0:
883 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
884 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
886 def get_random_number():
887 r
= random
.randrange(start
=1, stop
=99999999)
895 value
= params
.get(key
)
896 if '!!yaml' in str(value
):
897 value
= yaml
.load(value
[7:])
900 values_file
= get_random_number() + '.yaml'
901 with
open(values_file
, 'w') as stream
:
902 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
904 return '-f {}'.format(values_file
), values_file
908 # params for use in --set option
910 def _params_to_set_option(params
: dict) -> str:
912 if params
and len(params
) > 0:
915 value
= params
.get(key
, None)
916 if value
is not None:
918 params_str
+= '--set '
922 params_str
+= '{}={}'.format(key
, value
)
926 def _output_to_lines(output
: str) -> list:
927 output_lines
= list()
928 lines
= output
.splitlines(keepends
=False)
932 output_lines
.append(line
)
936 def _output_to_table(output
: str) -> list:
937 output_table
= list()
938 lines
= output
.splitlines(keepends
=False)
940 line
= line
.replace('\t', ' ')
942 output_table
.append(line_list
)
943 cells
= line
.split(sep
=' ')
947 line_list
.append(cell
)
950 def _get_paths(self
, cluster_name
: str, create_if_not_exist
: bool = False) -> (str, str, str, str):
952 Returns kube and helm directories
955 :param create_if_not_exist:
956 :return: kube, helm directories, config filename and cluster dir.
957 Raises exception if not exist and cannot create
961 if base
.endswith("/") or base
.endswith("\\"):
964 # base dir for cluster
965 cluster_dir
= base
+ '/' + cluster_name
966 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
967 self
.debug('Creating dir {}'.format(cluster_dir
))
968 os
.makedirs(cluster_dir
)
969 if not os
.path
.exists(cluster_dir
):
970 msg
= 'Base cluster dir {} does not exist'.format(cluster_dir
)
975 kube_dir
= cluster_dir
+ '/' + '.kube'
976 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
977 self
.debug('Creating dir {}'.format(kube_dir
))
978 os
.makedirs(kube_dir
)
979 if not os
.path
.exists(kube_dir
):
980 msg
= 'Kube config dir {} does not exist'.format(kube_dir
)
985 helm_dir
= cluster_dir
+ '/' + '.helm'
986 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
987 self
.debug('Creating dir {}'.format(helm_dir
))
988 os
.makedirs(helm_dir
)
989 if not os
.path
.exists(helm_dir
):
990 msg
= 'Helm config dir {} does not exist'.format(helm_dir
)
994 config_filename
= kube_dir
+ '/config'
995 return kube_dir
, helm_dir
, config_filename
, cluster_dir
998 def _remove_multiple_spaces(str):
1001 str = str.replace(' ', ' ')
1008 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1009 self
.debug('Executing sync local command: {}'.format(command
))
1010 # raise exception if fails
1013 output
= subprocess
.check_output(command
, shell
=True, universal_newlines
=True)
1016 except Exception as e
:
1019 return output
, return_code
1021 async def _local_async_exec(
1024 raise_exception_on_error
: bool = False,
1025 show_error_log
: bool = True
1028 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1029 self
.debug('Executing async local command: {}'.format(command
))
1032 command
= command
.split(sep
=' ')
1035 process
= await asyncio
.create_subprocess_exec(
1037 stdout
=asyncio
.subprocess
.PIPE
,
1038 stderr
=asyncio
.subprocess
.PIPE
1041 # wait for command terminate
1042 stdout
, stderr
= await process
.communicate()
1044 return_code
= process
.returncode
1048 output
= stdout
.decode('utf-8').strip()
1050 output
= stderr
.decode('utf-8').strip()
1052 if return_code
!= 0 and show_error_log
:
1053 self
.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code
, output
))
1055 self
.debug('Return code: {}'.format(return_code
))
1057 if raise_exception_on_error
and return_code
!= 0:
1058 raise Exception(output
)
1060 return output
, return_code
1062 except Exception as e
:
1063 msg
= 'Exception executing command: {} -> {}'.format(command
, e
)
1077 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1078 self
.debug('Executing sync remote ssh command: {}'.format(command
))
1080 ssh
= paramiko
.SSHClient()
1081 ssh
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
1082 ssh
.connect(hostname
=hostname
, username
=username
, password
=password
)
1083 ssh_stdin
, ssh_stdout
, ssh_stderr
= ssh
.exec_command(command
=command
, timeout
=timeout
)
1084 output
= ssh_stdout
.read().decode('utf-8')
1085 error
= ssh_stderr
.read().decode('utf-8')
1087 self
.error('ERROR: {}'.format(error
))
1091 output
= output
.replace('\\n', '\n')
1092 self
.debug('OUTPUT: {}'.format(output
))
1094 return output
, return_code
1096 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1097 self
.debug('Checking if file {} exists...'.format(filename
))
1098 if os
.path
.exists(filename
):
1101 msg
= 'File {} does not exist'.format(filename
)
1102 if exception_if_not_exists
:
1104 raise Exception(msg
)