2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
30 from uuid
import uuid4
32 from n2vc
.k8s_conn
import K8sConnector
33 from n2vc
.exceptions
import K8sException
36 class K8sHelmConnector(K8sConnector
):
39 ##################################################################################################
40 ########################################## P U B L I C ###########################################
41 ##################################################################################################
48 kubectl_command
: str = '/usr/bin/kubectl',
49 helm_command
: str = '/usr/bin/helm',
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
60 :param on_update_db: callback called when k8s connector updates database
64 K8sConnector
.__init
__(
68 on_update_db
=on_update_db
71 self
.info('Initializing K8S Helm connector')
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # initialize helm client-only
88 self
.debug('Initializing helm client-only...')
89 command
= '{} init --client-only'.format(self
._helm
_command
)
91 asyncio
.ensure_future(self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False))
92 # loop = asyncio.get_event_loop()
93 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
94 except Exception as e
:
95 self
.warning(msg
='helm init failed (it was already initialized): {}'.format(e
))
97 self
.info('K8S Helm connector initialized')
102 namespace
: str = 'kube-system',
103 reuse_cluster_uuid
=None
106 cluster_uuid
= reuse_cluster_uuid
108 cluster_uuid
= str(uuid4())
110 self
.debug('Initializing K8S environment. namespace: {}'.format(namespace
))
112 # create config filename
113 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
114 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
115 f
= open(config_filename
, "w")
119 # check if tiller pod is up in cluster
120 command
= '{} --kubeconfig={} --namespace={} get deployments'\
121 .format(self
.kubectl_command
, config_filename
, namespace
)
122 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
124 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
126 # find 'tiller' pod in all pods
127 already_initialized
= False
129 for row
in output_table
:
130 if row
[0].startswith('tiller-deploy'):
131 already_initialized
= True
133 except Exception as e
:
137 n2vc_installed_sw
= False
138 if not already_initialized
:
139 self
.info('Initializing helm in client and server: {}'.format(cluster_uuid
))
140 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
141 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
142 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
143 n2vc_installed_sw
= True
145 # check client helm installation
146 check_file
= helm_dir
+ '/repository/repositories.yaml'
147 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
148 self
.info('Initializing helm in client: {}'.format(cluster_uuid
))
149 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
150 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
151 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
153 self
.info('Helm client already initialized')
155 self
.info('Cluster initialized {}'.format(cluster_uuid
))
157 return cluster_uuid
, n2vc_installed_sw
164 repo_type
: str = 'chart'
167 self
.debug('adding {} repository {}. URL: {}'.format(repo_type
, name
, url
))
170 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
171 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
174 command
= '{} --kubeconfig={} --home={} repo update'.format(self
._helm
_command
, config_filename
, helm_dir
)
175 self
.debug('updating repo: {}'.format(command
))
176 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
178 # helm repo add name url
179 command
= '{} --kubeconfig={} --home={} repo add {} {}'\
180 .format(self
._helm
_command
, config_filename
, helm_dir
, name
, url
)
181 self
.debug('adding repo: {}'.format(command
))
182 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
189 Get the list of registered repositories
191 :return: list of registered repositories: [ (name, url) .... ]
194 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
197 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
198 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
200 command
= '{} --kubeconfig={} --home={} repo list --output yaml'\
201 .format(self
._helm
_command
, config_filename
, helm_dir
)
203 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
204 if output
and len(output
) > 0:
205 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
209 async def repo_remove(
215 Remove a repository from OSM
217 :param cluster_uuid: the cluster
218 :param name: repo name in OSM
219 :return: True if successful
222 self
.debug('list repositories for cluster {}'.format(cluster_uuid
))
225 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
226 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
228 command
= '{} --kubeconfig={} --home={} repo remove {}'\
229 .format(self
._helm
_command
, config_filename
, helm_dir
, name
)
231 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
237 uninstall_sw
: bool = False
240 self
.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid
))
242 # get kube and helm directories
243 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
244 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=False)
246 # uninstall releases if needed
247 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
248 if len(releases
) > 0:
252 kdu_instance
= r
.get('Name')
253 chart
= r
.get('Chart')
254 self
.debug('Uninstalling {} -> {}'.format(chart
, kdu_instance
))
255 await self
.uninstall(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
256 except Exception as e
:
257 self
.error('Error uninstalling release {}: {}'.format(kdu_instance
, e
))
259 msg
= 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
260 .format(cluster_uuid
)
262 raise K8sException(msg
)
266 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
268 # find namespace for tiller pod
269 command
= '{} --kubeconfig={} get deployments --all-namespaces'\
270 .format(self
.kubectl_command
, config_filename
)
271 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
272 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
274 for r
in output_table
:
276 if 'tiller-deploy' in r
[1]:
279 except Exception as e
:
282 msg
= 'Tiller deployment not found in cluster {}'.format(cluster_uuid
)
285 self
.debug('namespace for tiller: {}'.format(namespace
))
287 force_str
= '--force'
290 # delete tiller deployment
291 self
.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid
, namespace
))
292 command
= '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
293 .format(self
.kubectl_command
, namespace
, config_filename
, force_str
)
294 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
296 # uninstall tiller from cluster
297 self
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
298 command
= '{} --kubeconfig={} --home={} reset'\
299 .format(self
._helm
_command
, config_filename
, helm_dir
)
300 self
.debug('resetting: {}'.format(command
))
301 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
303 self
.debug('namespace not found')
305 # delete cluster directory
306 dir = self
.fs
.path
+ '/' + cluster_uuid
307 self
.debug('Removing directory {}'.format(dir))
308 shutil
.rmtree(dir, ignore_errors
=True)
317 timeout
: float = 300,
322 self
.debug('installing {} in cluster {}'.format(kdu_model
, cluster_uuid
))
325 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
326 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
329 # params_str = K8sHelmConnector._params_to_set_option(params)
330 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
334 timeout_str
= '--timeout {}'.format(timeout
)
339 atomic_str
= '--atomic'
344 parts
= kdu_model
.split(sep
=':')
346 version_str
= '--version {}'.format(parts
[1])
349 # generate a name for the release. Then, check if already exists
351 while kdu_instance
is None:
352 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
354 result
= await self
._status
_kdu
(
355 cluster_uuid
=cluster_uuid
,
356 kdu_instance
=kdu_instance
,
359 if result
is not None:
360 # instance already exists: generate a new one
362 except Exception as e
:
366 command
= '{} install {} --output yaml --kubeconfig={} --home={} {} {} --name={} {} {}'\
367 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
368 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
369 self
.debug('installing: {}'.format(command
))
372 # exec helm in a task
373 exec_task
= asyncio
.ensure_future(
374 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
376 # write status in another task
377 status_task
= asyncio
.ensure_future(
378 coro_or_future
=self
._store
_status
(
379 cluster_uuid
=cluster_uuid
,
380 kdu_instance
=kdu_instance
,
387 # wait for execution task
388 await asyncio
.wait([exec_task
])
393 output
, rc
= exec_task
.result()
397 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
399 # remove temporal values yaml file
401 os
.remove(file_to_delete
)
404 await self
._store
_status
(
405 cluster_uuid
=cluster_uuid
,
406 kdu_instance
=kdu_instance
,
414 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
416 raise K8sException(msg
)
418 self
.debug('Returning kdu_instance {}'.format(kdu_instance
))
421 async def instances_list(
426 returns a list of deployed releases in a cluster
428 :param cluster_uuid: the cluster
432 self
.debug('list releases for cluster {}'.format(cluster_uuid
))
435 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
436 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
438 command
= '{} --kubeconfig={} --home={} list --output yaml'\
439 .format(self
._helm
_command
, config_filename
, helm_dir
)
441 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
443 if output
and len(output
) > 0:
444 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get('Releases')
452 kdu_model
: str = None,
454 timeout
: float = 300,
459 self
.debug('upgrading {} in cluster {}'.format(kdu_model
, cluster_uuid
))
462 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
463 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
466 # params_str = K8sHelmConnector._params_to_set_option(params)
467 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
471 timeout_str
= '--timeout {}'.format(timeout
)
476 atomic_str
= '--atomic'
480 if kdu_model
and ':' in kdu_model
:
481 parts
= kdu_model
.split(sep
=':')
483 version_str
= '--version {}'.format(parts
[1])
487 command
= '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
488 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
489 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
490 self
.debug('upgrading: {}'.format(command
))
494 # exec helm in a task
495 exec_task
= asyncio
.ensure_future(
496 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
498 # write status in another task
499 status_task
= asyncio
.ensure_future(
500 coro_or_future
=self
._store
_status
(
501 cluster_uuid
=cluster_uuid
,
502 kdu_instance
=kdu_instance
,
509 # wait for execution task
510 await asyncio
.wait([exec_task
])
514 output
, rc
= exec_task
.result()
518 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
520 # remove temporal values yaml file
522 os
.remove(file_to_delete
)
525 await self
._store
_status
(
526 cluster_uuid
=cluster_uuid
,
527 kdu_instance
=kdu_instance
,
535 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
537 raise K8sException(msg
)
539 # return new revision number
540 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
542 revision
= int(instance
.get('Revision'))
543 self
.debug('New revision: {}'.format(revision
))
556 self
.debug('rollback kdu_instance {} to revision {} from cluster {}'
557 .format(kdu_instance
, revision
, cluster_uuid
))
560 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
561 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
563 command
= '{} rollback --kubeconfig={} --home={} {} {} --wait'\
564 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
)
566 # exec helm in a task
567 exec_task
= asyncio
.ensure_future(
568 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
570 # write status in another task
571 status_task
= asyncio
.ensure_future(
572 coro_or_future
=self
._store
_status
(
573 cluster_uuid
=cluster_uuid
,
574 kdu_instance
=kdu_instance
,
576 operation
='rollback',
581 # wait for execution task
582 await asyncio
.wait([exec_task
])
587 output
, rc
= exec_task
.result()
590 await self
._store
_status
(
591 cluster_uuid
=cluster_uuid
,
592 kdu_instance
=kdu_instance
,
594 operation
='rollback',
600 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
602 raise K8sException(msg
)
604 # return new revision number
605 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
607 revision
= int(instance
.get('Revision'))
608 self
.debug('New revision: {}'.format(revision
))
619 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
620 after all _terminate-config-primitive_ of the VNF are invoked).
622 :param cluster_uuid: UUID of a K8s cluster known by OSM
623 :param kdu_instance: unique name for the KDU instance to be deleted
624 :return: True if successful
627 self
.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance
, cluster_uuid
))
630 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
631 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
633 command
= '{} --kubeconfig={} --home={} delete --purge {}'\
634 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
636 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
638 return self
._output
_to
_table
(output
)
640 async def inspect_kdu(
646 self
.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model
, repo_url
))
648 return await self
._exec
_inspect
_comand
(inspect_command
='', kdu_model
=kdu_model
, repo_url
=repo_url
)
650 async def values_kdu(
656 self
.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model
, repo_url
))
658 return await self
._exec
_inspect
_comand
(inspect_command
='values', kdu_model
=kdu_model
, repo_url
=repo_url
)
666 self
.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model
, repo_url
))
668 return await self
._exec
_inspect
_comand
(inspect_command
='readme', kdu_model
=kdu_model
, repo_url
=repo_url
)
670 async def status_kdu(
676 # call internal function
677 return await self
._status
_kdu
(
678 cluster_uuid
=cluster_uuid
,
679 kdu_instance
=kdu_instance
,
685 ##################################################################################################
686 ########################################## P R I V A T E #########################################
687 ##################################################################################################
690 async def _exec_inspect_comand(
692 inspect_command
: str,
699 repo_str
= ' --repo {}'.format(repo_url
)
700 idx
= kdu_model
.find('/')
703 kdu_model
= kdu_model
[idx
:]
705 inspect_command
= '{} inspect {} {}{}'.format(self
._helm
_command
, inspect_command
, kdu_model
, repo_str
)
706 output
, rc
= await self
._local
_async
_exec
(command
=inspect_command
, encode_utf8
=True)
710 async def _status_kdu(
714 show_error_log
: bool = False,
715 return_text
: bool = False
718 self
.debug('status of kdu_instance {}'.format(kdu_instance
))
721 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
722 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
724 command
= '{} --kubeconfig={} --home={} status {} --output yaml'\
725 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
727 output
, rc
= await self
._local
_async
_exec
(
729 raise_exception_on_error
=True,
730 show_error_log
=show_error_log
739 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
741 # remove field 'notes'
743 del data
.get('info').get('status')['notes']
747 # parse field 'resources'
749 resources
= str(data
.get('info').get('status').get('resources'))
750 resource_table
= self
._output
_to
_table
(resources
)
751 data
.get('info').get('status')['resources'] = resource_table
752 except Exception as e
:
757 async def get_instance_info(
762 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
763 for instance
in instances
:
764 if instance
.get('Name') == kdu_instance
:
766 self
.debug('Instance {} not found'.format(kdu_instance
))
770 def _generate_release_name(
773 # check embeded chart (file or dir)
774 if chart_name
.startswith('/'):
775 # extract file or directory name
776 chart_name
= chart_name
[chart_name
.rfind('/')+1:]
778 elif '://' in chart_name
:
779 # extract last portion of URL
780 chart_name
= chart_name
[chart_name
.rfind('/')+1:]
784 if c
.isalpha() or c
.isnumeric():
791 # if does not start with alpha character, prefix 'a'
792 if not name
[0].isalpha():
797 def get_random_number():
798 r
= random
.randrange(start
=1, stop
=99999999)
803 name
= name
+ get_random_number()
806 async def _store_status(
811 check_every
: float = 10,
812 db_dict
: dict = None,
813 run_once
: bool = False
817 await asyncio
.sleep(check_every
)
818 detailed_status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
819 status
= detailed_status
.get('info').get('Description')
821 self
.debug('STATUS:\n{}'.format(status
))
822 self
.debug('DETAILED STATUS:\n{}'.format(detailed_status
))
825 result
= await self
.write_app_status_to_db(
828 detailed_status
=str(detailed_status
),
831 self
.info('Error writing in database. Task exiting...')
833 except asyncio
.CancelledError
:
834 self
.debug('Task cancelled')
836 except Exception as e
:
842 async def _is_install_completed(
848 status
= await self
._status
_kdu
(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
, return_text
=False)
850 # extract info.status.resources-> str
853 # NAME READY UP-TO-DATE AVAILABLE AGE
854 # halting-horse-mongodb 0/1 1 0 0s
855 # halting-petit-mongodb 1/1 1 0 0s
857 resources
= K8sHelmConnector
._get
_deep
(status
, ('info', 'status', 'resources'))
860 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
862 num_lines
= len(resources
)
864 while index
< num_lines
:
866 line1
= resources
[index
]
868 # find '==>' in column 0
869 if line1
[0] == '==>':
870 line2
= resources
[index
]
872 # find READY in column 1
873 if line2
[1] == 'READY':
875 line3
= resources
[index
]
877 while len(line3
) > 1 and index
< num_lines
:
878 ready_value
= line3
[1]
879 parts
= ready_value
.split(sep
='/')
880 current
= int(parts
[0])
881 total
= int(parts
[1])
883 self
.debug('NOT READY:\n {}'.format(line3
))
885 line3
= resources
[index
]
888 except Exception as e
:
894 def _get_deep(dictionary
: dict, members
: tuple):
899 value
= target
.get(m
)
904 except Exception as e
:
908 # find key:value in several lines
910 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
913 if line
.startswith(p_key
+ ':'):
914 parts
= line
.split(':')
915 the_value
= parts
[1].strip()
917 except Exception as e
:
922 # params for use in -f file
923 # returns values file option and filename (in order to delete it at the end)
924 def _params_to_file_option(self
, cluster_uuid
: str, params
: dict) -> (str, str):
926 if params
and len(params
) > 0:
927 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
928 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
930 def get_random_number():
931 r
= random
.randrange(start
=1, stop
=99999999)
939 value
= params
.get(key
)
940 if '!!yaml' in str(value
):
941 value
= yaml
.load(value
[7:])
944 values_file
= get_random_number() + '.yaml'
945 with
open(values_file
, 'w') as stream
:
946 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
948 return '-f {}'.format(values_file
), values_file
952 # params for use in --set option
954 def _params_to_set_option(params
: dict) -> str:
956 if params
and len(params
) > 0:
959 value
= params
.get(key
, None)
960 if value
is not None:
962 params_str
+= '--set '
966 params_str
+= '{}={}'.format(key
, value
)
970 def _output_to_lines(output
: str) -> list:
971 output_lines
= list()
972 lines
= output
.splitlines(keepends
=False)
976 output_lines
.append(line
)
980 def _output_to_table(output
: str) -> list:
981 output_table
= list()
982 lines
= output
.splitlines(keepends
=False)
984 line
= line
.replace('\t', ' ')
986 output_table
.append(line_list
)
987 cells
= line
.split(sep
=' ')
991 line_list
.append(cell
)
994 def _get_paths(self
, cluster_name
: str, create_if_not_exist
: bool = False) -> (str, str, str, str):
996 Returns kube and helm directories
999 :param create_if_not_exist:
1000 :return: kube, helm directories, config filename and cluster dir.
1001 Raises exception if not exist and cannot create
1005 if base
.endswith("/") or base
.endswith("\\"):
1008 # base dir for cluster
1009 cluster_dir
= base
+ '/' + cluster_name
1010 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
1011 self
.debug('Creating dir {}'.format(cluster_dir
))
1012 os
.makedirs(cluster_dir
)
1013 if not os
.path
.exists(cluster_dir
):
1014 msg
= 'Base cluster dir {} does not exist'.format(cluster_dir
)
1016 raise K8sException(msg
)
1019 kube_dir
= cluster_dir
+ '/' + '.kube'
1020 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
1021 self
.debug('Creating dir {}'.format(kube_dir
))
1022 os
.makedirs(kube_dir
)
1023 if not os
.path
.exists(kube_dir
):
1024 msg
= 'Kube config dir {} does not exist'.format(kube_dir
)
1026 raise K8sException(msg
)
1029 helm_dir
= cluster_dir
+ '/' + '.helm'
1030 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
1031 self
.debug('Creating dir {}'.format(helm_dir
))
1032 os
.makedirs(helm_dir
)
1033 if not os
.path
.exists(helm_dir
):
1034 msg
= 'Helm config dir {} does not exist'.format(helm_dir
)
1036 raise K8sException(msg
)
1038 config_filename
= kube_dir
+ '/config'
1039 return kube_dir
, helm_dir
, config_filename
, cluster_dir
1042 def _remove_multiple_spaces(str):
1045 str = str.replace(' ', ' ')
1052 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1053 self
.debug('Executing sync local command: {}'.format(command
))
1054 # raise exception if fails
1057 output
= subprocess
.check_output(command
, shell
=True, universal_newlines
=True)
1060 except Exception as e
:
1063 return output
, return_code
1065 async def _local_async_exec(
1068 raise_exception_on_error
: bool = False,
1069 show_error_log
: bool = True,
1070 encode_utf8
: bool = False
1073 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1074 self
.debug('Executing async local command: {}'.format(command
))
1077 command
= command
.split(sep
=' ')
1080 process
= await asyncio
.create_subprocess_exec(
1082 stdout
=asyncio
.subprocess
.PIPE
,
1083 stderr
=asyncio
.subprocess
.PIPE
1086 # wait for command terminate
1087 stdout
, stderr
= await process
.communicate()
1089 return_code
= process
.returncode
1093 output
= stdout
.decode('utf-8').strip()
1094 # output = stdout.decode()
1096 output
= stderr
.decode('utf-8').strip()
1097 # output = stderr.decode()
1099 if return_code
!= 0 and show_error_log
:
1100 self
.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code
, output
))
1102 self
.debug('Return code: {}'.format(return_code
))
1104 if raise_exception_on_error
and return_code
!= 0:
1105 raise Exception(output
)
1108 output
= output
.encode('utf-8').strip()
1109 output
= str(output
).replace('\\n', '\n')
1111 return output
, return_code
1113 except Exception as e
:
1114 msg
= 'Exception executing command: {} -> {}'.format(command
, e
)
1117 if raise_exception_on_error
:
1131 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1132 self
.debug('Executing sync remote ssh command: {}'.format(command
))
1134 ssh
= paramiko
.SSHClient()
1135 ssh
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
1136 ssh
.connect(hostname
=hostname
, username
=username
, password
=password
)
1137 ssh_stdin
, ssh_stdout
, ssh_stderr
= ssh
.exec_command(command
=command
, timeout
=timeout
)
1138 output
= ssh_stdout
.read().decode('utf-8')
1139 error
= ssh_stderr
.read().decode('utf-8')
1141 self
.error('ERROR: {}'.format(error
))
1145 output
= output
.replace('\\n', '\n')
1146 self
.debug('OUTPUT: {}'.format(output
))
1148 return output
, return_code
1150 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1151 self
.debug('Checking if file {} exists...'.format(filename
))
1152 if os
.path
.exists(filename
):
1155 msg
= 'File {} does not exist'.format(filename
)
1156 if exception_if_not_exists
:
1158 raise K8sException(msg
)