2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
29 from uuid
import uuid4
31 from n2vc
.k8s_conn
import K8sConnector
32 from n2vc
.exceptions
import K8sException
35 class K8sHelmConnector(K8sConnector
):
38 ##################################################################################################
39 ########################################## P U B L I C ###########################################
40 ##################################################################################################
47 kubectl_command
: str = '/usr/bin/kubectl',
48 helm_command
: str = '/usr/bin/helm',
54 :param fs: file system for kubernetes and helm configuration
55 :param db: database object to write current operation status
56 :param kubectl_command: path to kubectl executable
57 :param helm_command: path to helm executable
59 :param on_update_db: callback called when k8s connector updates database
63 K8sConnector
.__init
__(
67 on_update_db
=on_update_db
70 self
.log
.info('Initializing K8S Helm connector')
72 # random numbers for release name generation
73 random
.seed(time
.time())
78 # exception if kubectl is not installed
79 self
.kubectl_command
= kubectl_command
80 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
82 # exception if helm is not installed
83 self
._helm
_command
= helm_command
84 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
86 # initialize helm client-only
87 self
.log
.debug('Initializing helm client-only...')
88 command
= '{} init --client-only'.format(self
._helm
_command
)
90 asyncio
.ensure_future(self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False))
91 # loop = asyncio.get_event_loop()
92 # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
93 except Exception as e
:
94 self
.warning(msg
='helm init failed (it was already initialized): {}'.format(e
))
96 self
.log
.info('K8S Helm connector initialized')
101 namespace
: str = 'kube-system',
102 reuse_cluster_uuid
=None
105 It prepares a given K8s cluster environment to run Charts on both sides:
109 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default, 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
113 (on error, an exception will be raised)
116 cluster_uuid
= reuse_cluster_uuid
118 cluster_uuid
= str(uuid4())
120 self
.log
.debug('Initializing K8S environment. namespace: {}'.format(namespace
))
122 # create config filename
123 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
124 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
125 f
= open(config_filename
, "w")
129 # check if tiller pod is up in cluster
130 command
= '{} --kubeconfig={} --namespace={} get deployments'\
131 .format(self
.kubectl_command
, config_filename
, namespace
)
132 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
134 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
136 # find 'tiller' pod in all pods
137 already_initialized
= False
139 for row
in output_table
:
140 if row
[0].startswith('tiller-deploy'):
141 already_initialized
= True
143 except Exception as e
:
147 n2vc_installed_sw
= False
148 if not already_initialized
:
149 self
.log
.info('Initializing helm in client and server: {}'.format(cluster_uuid
))
150 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init'\
151 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
152 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
153 n2vc_installed_sw
= True
155 # check client helm installation
156 check_file
= helm_dir
+ '/repository/repositories.yaml'
157 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
158 self
.log
.info('Initializing helm in client: {}'.format(cluster_uuid
))
159 command
= '{} --kubeconfig={} --tiller-namespace={} --home={} init --client-only'\
160 .format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
161 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
163 self
.log
.info('Helm client already initialized')
165 self
.log
.info('Cluster initialized {}'.format(cluster_uuid
))
167 return cluster_uuid
, n2vc_installed_sw
174 repo_type
: str = 'chart'
177 self
.log
.debug('adding {} repository {}. URL: {}'.format(repo_type
, name
, url
))
180 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
181 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
184 command
= '{} --kubeconfig={} --home={} repo update'.format(self
._helm
_command
, config_filename
, helm_dir
)
185 self
.log
.debug('updating repo: {}'.format(command
))
186 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
188 # helm repo add name url
189 command
= '{} --kubeconfig={} --home={} repo add {} {}'\
190 .format(self
._helm
_command
, config_filename
, helm_dir
, name
, url
)
191 self
.log
.debug('adding repo: {}'.format(command
))
192 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
199 Get the list of registered repositories
201 :return: list of registered repositories: [ (name, url) .... ]
204 self
.log
.debug('list repositories for cluster {}'.format(cluster_uuid
))
207 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
208 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
210 command
= '{} --kubeconfig={} --home={} repo list --output yaml'\
211 .format(self
._helm
_command
, config_filename
, helm_dir
)
213 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
214 if output
and len(output
) > 0:
215 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
219 async def repo_remove(
225 Remove a repository from OSM
227 :param cluster_uuid: the cluster
228 :param name: repo name in OSM
229 :return: True if successful
232 self
.log
.debug('list repositories for cluster {}'.format(cluster_uuid
))
235 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
236 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
238 command
= '{} --kubeconfig={} --home={} repo remove {}'\
239 .format(self
._helm
_command
, config_filename
, helm_dir
, name
)
241 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
247 uninstall_sw
: bool = False
250 self
.log
.debug('Resetting K8s environment. cluster uuid: {}'.format(cluster_uuid
))
252 # get kube and helm directories
253 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
254 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=False)
256 # uninstall releases if needed
257 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
258 if len(releases
) > 0:
262 kdu_instance
= r
.get('Name')
263 chart
= r
.get('Chart')
264 self
.log
.debug('Uninstalling {} -> {}'.format(chart
, kdu_instance
))
265 await self
.uninstall(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
266 except Exception as e
:
267 self
.log
.error('Error uninstalling release {}: {}'.format(kdu_instance
, e
))
269 msg
= 'Cluster has releases and not force. Cannot reset K8s environment. Cluster uuid: {}'\
270 .format(cluster_uuid
)
272 raise K8sException(msg
)
276 self
.log
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
278 # find namespace for tiller pod
279 command
= '{} --kubeconfig={} get deployments --all-namespaces'\
280 .format(self
.kubectl_command
, config_filename
)
281 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
282 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
284 for r
in output_table
:
286 if 'tiller-deploy' in r
[1]:
289 except Exception as e
:
292 msg
= 'Tiller deployment not found in cluster {}'.format(cluster_uuid
)
295 self
.log
.debug('namespace for tiller: {}'.format(namespace
))
297 force_str
= '--force'
300 # delete tiller deployment
301 self
.log
.debug('Deleting tiller deployment for cluster {}, namespace {}'.format(cluster_uuid
, namespace
))
302 command
= '{} --namespace {} --kubeconfig={} {} delete deployment tiller-deploy'\
303 .format(self
.kubectl_command
, namespace
, config_filename
, force_str
)
304 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
306 # uninstall tiller from cluster
307 self
.log
.debug('Uninstalling tiller from cluster {}'.format(cluster_uuid
))
308 command
= '{} --kubeconfig={} --home={} reset'\
309 .format(self
._helm
_command
, config_filename
, helm_dir
)
310 self
.log
.debug('resetting: {}'.format(command
))
311 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
313 self
.log
.debug('namespace not found')
315 # delete cluster directory
316 dir = self
.fs
.path
+ '/' + cluster_uuid
317 self
.log
.debug('Removing directory {}'.format(dir))
318 shutil
.rmtree(dir, ignore_errors
=True)
327 timeout
: float = 300,
329 db_dict
: dict = None,
330 kdu_name
: str = None,
331 namespace
: str = None
334 self
.log
.debug('installing {} in cluster {}'.format(kdu_model
, cluster_uuid
))
337 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
338 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
341 # params_str = K8sHelmConnector._params_to_set_option(params)
342 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
346 timeout_str
= '--timeout {}'.format(timeout
)
351 atomic_str
= '--atomic'
355 namespace_str
= "--namespace {}".format(namespace
)
360 parts
= kdu_model
.split(sep
=':')
362 version_str
= '--version {}'.format(parts
[1])
365 # generate a name for the release. Then, check if already exists
367 while kdu_instance
is None:
368 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
370 result
= await self
._status
_kdu
(
371 cluster_uuid
=cluster_uuid
,
372 kdu_instance
=kdu_instance
,
375 if result
is not None:
376 # instance already exists: generate a new one
382 command
= '{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} {params} {timeout} ' \
383 '--name={name} {ns} {model} {ver}'.format(helm
=self
._helm
_command
, atomic
=atomic_str
,
384 config
=config_filename
, dir=helm_dir
, params
=params_str
,
385 timeout
=timeout_str
, name
=kdu_instance
, ns
=namespace_str
,
386 model
=kdu_model
, ver
=version_str
)
387 self
.log
.debug('installing: {}'.format(command
))
390 # exec helm in a task
391 exec_task
= asyncio
.ensure_future(
392 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
395 # write status in another task
396 status_task
= asyncio
.ensure_future(
397 coro_or_future
=self
._store
_status
(
398 cluster_uuid
=cluster_uuid
,
399 kdu_instance
=kdu_instance
,
406 # wait for execution task
407 await asyncio
.wait([exec_task
])
412 output
, rc
= exec_task
.result()
416 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
418 # remove temporal values yaml file
420 os
.remove(file_to_delete
)
423 await self
._store
_status
(
424 cluster_uuid
=cluster_uuid
,
425 kdu_instance
=kdu_instance
,
433 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
435 raise K8sException(msg
)
437 self
.log
.debug('Returning kdu_instance {}'.format(kdu_instance
))
440 async def instances_list(
445 returns a list of deployed releases in a cluster
447 :param cluster_uuid: the cluster
451 self
.log
.debug('list releases for cluster {}'.format(cluster_uuid
))
454 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
455 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
457 command
= '{} --kubeconfig={} --home={} list --output yaml'\
458 .format(self
._helm
_command
, config_filename
, helm_dir
)
460 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
462 if output
and len(output
) > 0:
463 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get('Releases')
471 kdu_model
: str = None,
473 timeout
: float = 300,
478 self
.log
.debug('upgrading {} in cluster {}'.format(kdu_model
, cluster_uuid
))
481 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
482 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
485 # params_str = K8sHelmConnector._params_to_set_option(params)
486 params_str
, file_to_delete
= self
._params
_to
_file
_option
(cluster_uuid
=cluster_uuid
, params
=params
)
490 timeout_str
= '--timeout {}'.format(timeout
)
495 atomic_str
= '--atomic'
499 if kdu_model
and ':' in kdu_model
:
500 parts
= kdu_model
.split(sep
=':')
502 version_str
= '--version {}'.format(parts
[1])
506 command
= '{} upgrade {} --output yaml --kubeconfig={} --home={} {} {} {} {} {}'\
507 .format(self
._helm
_command
, atomic_str
, config_filename
, helm_dir
,
508 params_str
, timeout_str
, kdu_instance
, kdu_model
, version_str
)
509 self
.log
.debug('upgrading: {}'.format(command
))
513 # exec helm in a task
514 exec_task
= asyncio
.ensure_future(
515 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
517 # write status in another task
518 status_task
= asyncio
.ensure_future(
519 coro_or_future
=self
._store
_status
(
520 cluster_uuid
=cluster_uuid
,
521 kdu_instance
=kdu_instance
,
528 # wait for execution task
529 await asyncio
.wait([exec_task
])
533 output
, rc
= exec_task
.result()
537 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
539 # remove temporal values yaml file
541 os
.remove(file_to_delete
)
544 await self
._store
_status
(
545 cluster_uuid
=cluster_uuid
,
546 kdu_instance
=kdu_instance
,
554 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
556 raise K8sException(msg
)
558 # return new revision number
559 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
561 revision
= int(instance
.get('Revision'))
562 self
.log
.debug('New revision: {}'.format(revision
))
575 self
.log
.debug('rollback kdu_instance {} to revision {} from cluster {}'
576 .format(kdu_instance
, revision
, cluster_uuid
))
579 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
580 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
582 command
= '{} rollback --kubeconfig={} --home={} {} {} --wait'\
583 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
)
585 # exec helm in a task
586 exec_task
= asyncio
.ensure_future(
587 coro_or_future
=self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
589 # write status in another task
590 status_task
= asyncio
.ensure_future(
591 coro_or_future
=self
._store
_status
(
592 cluster_uuid
=cluster_uuid
,
593 kdu_instance
=kdu_instance
,
595 operation
='rollback',
600 # wait for execution task
601 await asyncio
.wait([exec_task
])
606 output
, rc
= exec_task
.result()
609 await self
._store
_status
(
610 cluster_uuid
=cluster_uuid
,
611 kdu_instance
=kdu_instance
,
613 operation
='rollback',
619 msg
= 'Error executing command: {}\nOutput: {}'.format(command
, output
)
621 raise K8sException(msg
)
623 # return new revision number
624 instance
= await self
.get_instance_info(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
626 revision
= int(instance
.get('Revision'))
627 self
.log
.debug('New revision: {}'.format(revision
))
638 Removes an existing KDU instance. It would implicitly use the `delete` call (this call would happen
639 after all _terminate-config-primitive_ of the VNF are invoked).
641 :param cluster_uuid: UUID of a K8s cluster known by OSM
642 :param kdu_instance: unique name for the KDU instance to be deleted
643 :return: True if successful
646 self
.log
.debug('uninstall kdu_instance {} from cluster {}'.format(kdu_instance
, cluster_uuid
))
649 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
650 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
652 command
= '{} --kubeconfig={} --home={} delete --purge {}'\
653 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
655 output
, rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
657 return self
._output
_to
_table
(output
)
659 async def exec_primitive(
661 cluster_uuid
: str = None,
662 kdu_instance
: str = None,
663 primitive_name
: str = None,
664 timeout
: float = 300,
666 db_dict
: dict = None,
668 """Exec primitive (Juju action)
670 :param cluster_uuid str: The UUID of the cluster
671 :param kdu_instance str: The unique name of the KDU instance
672 :param primitive_name: Name of action that will be executed
673 :param timeout: Timeout for action execution
674 :param params: Dictionary of all the parameters needed for the action
675 :db_dict: Dictionary for any additional data
677 :return: Returns the output of the action
679 raise K8sException("KDUs deployed with Helm don't support actions "
680 "different from rollback, upgrade and status")
682 async def inspect_kdu(
688 self
.log
.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model
, repo_url
))
690 return await self
._exec
_inspect
_comand
(inspect_command
='', kdu_model
=kdu_model
, repo_url
=repo_url
)
692 async def values_kdu(
698 self
.log
.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model
, repo_url
))
700 return await self
._exec
_inspect
_comand
(inspect_command
='values', kdu_model
=kdu_model
, repo_url
=repo_url
)
708 self
.log
.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model
, repo_url
))
710 return await self
._exec
_inspect
_comand
(inspect_command
='readme', kdu_model
=kdu_model
, repo_url
=repo_url
)
712 async def status_kdu(
718 # call internal function
719 return await self
._status
_kdu
(
720 cluster_uuid
=cluster_uuid
,
721 kdu_instance
=kdu_instance
,
726 async def synchronize_repos(self
, cluster_uuid
: str):
728 self
.log
.debug("syncronize repos for cluster helm-id: {}",)
730 update_repos_timeout
= 300 # max timeout to sync a single repos, more than this is too much
731 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_admin.helm-chart.id": cluster_uuid
})
733 nbi_repo_list
= db_k8scluster
.get("_admin").get("helm_chart_repos") or []
734 cluster_repo_dict
= db_k8scluster
.get("_admin").get("helm_charts_added") or {}
735 # elements that must be deleted
736 deleted_repo_list
= []
738 self
.log
.debug("helm_chart_repos: {}".format(nbi_repo_list
))
739 self
.log
.debug("helm_charts_added: {}".format(cluster_repo_dict
))
741 # obtain repos to add: registered by nbi but not added
742 repos_to_add
= [repo
for repo
in nbi_repo_list
if not cluster_repo_dict
.get(repo
)]
744 # obtain repos to delete: added by cluster but not in nbi list
745 repos_to_delete
= [repo
for repo
in cluster_repo_dict
.keys() if repo
not in nbi_repo_list
]
747 # delete repos: must delete first then add because there may be different repos with same name but
748 # different id and url
749 self
.log
.debug("repos to delete: {}".format(repos_to_delete
))
750 for repo_id
in repos_to_delete
:
751 # try to delete repos
753 repo_delete_task
= asyncio
.ensure_future(self
.repo_remove(cluster_uuid
=cluster_uuid
,
754 name
=cluster_repo_dict
[repo_id
]))
755 await asyncio
.wait_for(repo_delete_task
, update_repos_timeout
)
756 except Exception as e
:
757 self
.warning("Error deleting repo, id: {}, name: {}, err_msg: {}".format(repo_id
,
758 cluster_repo_dict
[repo_id
], str(e
)))
759 # always add to the list of to_delete if there is an error because if is not there deleting raises error
760 deleted_repo_list
.append(repo_id
)
763 self
.log
.debug("repos to add: {}".format(repos_to_add
))
765 for repo_id
in repos_to_add
:
766 # obtain the repo data from the db
767 # if there is an error getting the repo in the database we will ignore this repo and continue
768 # because there is a possible race condition where the repo has been deleted while processing
769 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
770 self
.log
.debug("obtained repo: id, {}, name: {}, url: {}".format(repo_id
, db_repo
["name"], db_repo
["url"]))
772 repo_add_task
= asyncio
.ensure_future(self
.repo_add(cluster_uuid
=cluster_uuid
,
773 name
=db_repo
["name"], url
=db_repo
["url"],
775 await asyncio
.wait_for(repo_add_task
, update_repos_timeout
)
776 added_repo_dict
[repo_id
] = db_repo
["name"]
777 self
.log
.debug("added repo: id, {}, name: {}".format(repo_id
, db_repo
["name"]))
778 except Exception as e
:
779 # deal with error adding repo, adding a repo that already exists does not raise any error
780 # will not raise error because a wrong repos added by anyone could prevent instantiating any ns
781 self
.log
.error("Error adding repo id: {}, err_msg: {} ".format(repo_id
, repr(e
)))
783 return deleted_repo_list
, added_repo_dict
785 else: # else db_k8scluster does not exist
786 raise K8sException("k8cluster with helm-id : {} not found".format(cluster_uuid
))
788 except Exception as e
:
789 self
.log
.error("Error synchronizing repos: {}".format(str(e
)))
790 raise K8sException("Error synchronizing repos")
793 ##################################################################################################
794 ########################################## P R I V A T E #########################################
795 ##################################################################################################
798 async def _exec_inspect_comand(
800 inspect_command
: str,
807 repo_str
= ' --repo {}'.format(repo_url
)
808 idx
= kdu_model
.find('/')
811 kdu_model
= kdu_model
[idx
:]
813 inspect_command
= '{} inspect {} {}{}'.format(self
._helm
_command
, inspect_command
, kdu_model
, repo_str
)
814 output
, rc
= await self
._local
_async
_exec
(command
=inspect_command
, encode_utf8
=True)
818 async def _status_kdu(
822 show_error_log
: bool = False,
823 return_text
: bool = False
826 self
.log
.debug('status of kdu_instance {}'.format(kdu_instance
))
829 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
830 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
832 command
= '{} --kubeconfig={} --home={} status {} --output yaml'\
833 .format(self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
)
835 output
, rc
= await self
._local
_async
_exec
(
837 raise_exception_on_error
=True,
838 show_error_log
=show_error_log
847 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
849 # remove field 'notes'
851 del data
.get('info').get('status')['notes']
855 # parse field 'resources'
857 resources
= str(data
.get('info').get('status').get('resources'))
858 resource_table
= self
._output
_to
_table
(resources
)
859 data
.get('info').get('status')['resources'] = resource_table
860 except Exception as e
:
865 async def get_instance_info(
870 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
871 for instance
in instances
:
872 if instance
.get('Name') == kdu_instance
:
874 self
.log
.debug('Instance {} not found'.format(kdu_instance
))
878 def _generate_release_name(
881 # check embeded chart (file or dir)
882 if chart_name
.startswith('/'):
883 # extract file or directory name
884 chart_name
= chart_name
[chart_name
.rfind('/')+1:]
886 elif '://' in chart_name
:
887 # extract last portion of URL
888 chart_name
= chart_name
[chart_name
.rfind('/')+1:]
892 if c
.isalpha() or c
.isnumeric():
899 # if does not start with alpha character, prefix 'a'
900 if not name
[0].isalpha():
905 def get_random_number():
906 r
= random
.randrange(start
=1, stop
=99999999)
911 name
= name
+ get_random_number()
914 async def _store_status(
919 check_every
: float = 10,
920 db_dict
: dict = None,
921 run_once
: bool = False
925 await asyncio
.sleep(check_every
)
926 detailed_status
= await self
.status_kdu(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
)
927 status
= detailed_status
.get('info').get('Description')
928 self
.log
.debug('STATUS:\n{}'.format(status
))
929 self
.log
.debug('DETAILED STATUS:\n{}'.format(detailed_status
))
931 result
= await self
.write_app_status_to_db(
934 detailed_status
=str(detailed_status
),
937 self
.log
.info('Error writing in database. Task exiting...')
939 except asyncio
.CancelledError
:
940 self
.log
.debug('Task cancelled')
942 except Exception as e
:
943 self
.log
.debug('_store_status exception: {}'.format(str(e
)))
949 async def _is_install_completed(
955 status
= await self
._status
_kdu
(cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
, return_text
=False)
957 # extract info.status.resources-> str
960 # NAME READY UP-TO-DATE AVAILABLE AGE
961 # halting-horse-mongodb 0/1 1 0 0s
962 # halting-petit-mongodb 1/1 1 0 0s
964 resources
= K8sHelmConnector
._get
_deep
(status
, ('info', 'status', 'resources'))
967 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
969 num_lines
= len(resources
)
971 while index
< num_lines
:
973 line1
= resources
[index
]
975 # find '==>' in column 0
976 if line1
[0] == '==>':
977 line2
= resources
[index
]
979 # find READY in column 1
980 if line2
[1] == 'READY':
982 line3
= resources
[index
]
984 while len(line3
) > 1 and index
< num_lines
:
985 ready_value
= line3
[1]
986 parts
= ready_value
.split(sep
='/')
987 current
= int(parts
[0])
988 total
= int(parts
[1])
990 self
.log
.debug('NOT READY:\n {}'.format(line3
))
992 line3
= resources
[index
]
995 except Exception as e
:
1001 def _get_deep(dictionary
: dict, members
: tuple):
1006 value
= target
.get(m
)
1011 except Exception as e
:
1015 # find key:value in several lines
1017 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1018 for line
in p_lines
:
1020 if line
.startswith(p_key
+ ':'):
1021 parts
= line
.split(':')
1022 the_value
= parts
[1].strip()
1024 except Exception as e
:
1029 # params for use in -f file
1030 # returns values file option and filename (in order to delete it at the end)
1031 def _params_to_file_option(self
, cluster_uuid
: str, params
: dict) -> (str, str):
1033 if params
and len(params
) > 0:
1034 kube_dir
, helm_dir
, config_filename
, cluster_dir
= \
1035 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
1037 def get_random_number():
1038 r
= random
.randrange(start
=1, stop
=99999999)
1046 value
= params
.get(key
)
1047 if '!!yaml' in str(value
):
1048 value
= yaml
.load(value
[7:])
1049 params2
[key
] = value
1051 values_file
= get_random_number() + '.yaml'
1052 with
open(values_file
, 'w') as stream
:
1053 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1055 return '-f {}'.format(values_file
), values_file
1059 # params for use in --set option
1061 def _params_to_set_option(params
: dict) -> str:
1063 if params
and len(params
) > 0:
1066 value
= params
.get(key
, None)
1067 if value
is not None:
1069 params_str
+= '--set '
1073 params_str
+= '{}={}'.format(key
, value
)
1077 def _output_to_lines(output
: str) -> list:
1078 output_lines
= list()
1079 lines
= output
.splitlines(keepends
=False)
1083 output_lines
.append(line
)
1087 def _output_to_table(output
: str) -> list:
1088 output_table
= list()
1089 lines
= output
.splitlines(keepends
=False)
1091 line
= line
.replace('\t', ' ')
1093 output_table
.append(line_list
)
1094 cells
= line
.split(sep
=' ')
1098 line_list
.append(cell
)
1101 def _get_paths(self
, cluster_name
: str, create_if_not_exist
: bool = False) -> (str, str, str, str):
1103 Returns kube and helm directories
1105 :param cluster_name:
1106 :param create_if_not_exist:
1107 :return: kube, helm directories, config filename and cluster dir.
1108 Raises exception if not exist and cannot create
1112 if base
.endswith("/") or base
.endswith("\\"):
1115 # base dir for cluster
1116 cluster_dir
= base
+ '/' + cluster_name
1117 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
1118 self
.log
.debug('Creating dir {}'.format(cluster_dir
))
1119 os
.makedirs(cluster_dir
)
1120 if not os
.path
.exists(cluster_dir
):
1121 msg
= 'Base cluster dir {} does not exist'.format(cluster_dir
)
1123 raise K8sException(msg
)
1126 kube_dir
= cluster_dir
+ '/' + '.kube'
1127 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
1128 self
.log
.debug('Creating dir {}'.format(kube_dir
))
1129 os
.makedirs(kube_dir
)
1130 if not os
.path
.exists(kube_dir
):
1131 msg
= 'Kube config dir {} does not exist'.format(kube_dir
)
1133 raise K8sException(msg
)
1136 helm_dir
= cluster_dir
+ '/' + '.helm'
1137 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
1138 self
.log
.debug('Creating dir {}'.format(helm_dir
))
1139 os
.makedirs(helm_dir
)
1140 if not os
.path
.exists(helm_dir
):
1141 msg
= 'Helm config dir {} does not exist'.format(helm_dir
)
1143 raise K8sException(msg
)
1145 config_filename
= kube_dir
+ '/config'
1146 return kube_dir
, helm_dir
, config_filename
, cluster_dir
1149 def _remove_multiple_spaces(str):
1152 str = str.replace(' ', ' ')
1159 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1160 self
.log
.debug('Executing sync local command: {}'.format(command
))
1161 # raise exception if fails
1164 output
= subprocess
.check_output(command
, shell
=True, universal_newlines
=True)
1166 self
.log
.debug(output
)
1167 except Exception as e
:
1170 return output
, return_code
1172 async def _local_async_exec(
1175 raise_exception_on_error
: bool = False,
1176 show_error_log
: bool = True,
1177 encode_utf8
: bool = False
1180 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1181 self
.log
.debug('Executing async local command: {}'.format(command
))
1184 command
= command
.split(sep
=' ')
1187 process
= await asyncio
.create_subprocess_exec(
1189 stdout
=asyncio
.subprocess
.PIPE
,
1190 stderr
=asyncio
.subprocess
.PIPE
1193 # wait for command terminate
1194 stdout
, stderr
= await process
.communicate()
1196 return_code
= process
.returncode
1200 output
= stdout
.decode('utf-8').strip()
1201 # output = stdout.decode()
1203 output
= stderr
.decode('utf-8').strip()
1204 # output = stderr.decode()
1206 if return_code
!= 0 and show_error_log
:
1207 self
.log
.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code
, output
))
1209 self
.log
.debug('Return code: {}'.format(return_code
))
1211 if raise_exception_on_error
and return_code
!= 0:
1212 raise K8sException(output
)
1215 output
= output
.encode('utf-8').strip()
1216 output
= str(output
).replace('\\n', '\n')
1218 return output
, return_code
1220 except asyncio
.CancelledError
:
1222 except K8sException
:
1224 except Exception as e
:
1225 msg
= 'Exception executing command: {} -> {}'.format(command
, e
)
1227 if raise_exception_on_error
:
1228 raise K8sException(e
) from e
1232 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1233 # self.log.debug('Checking if file {} exists...'.format(filename))
1234 if os
.path
.exists(filename
):
1237 msg
= 'File {} does not exist'.format(filename
)
1238 if exception_if_not_exists
:
1239 # self.log.error(msg)
1240 raise K8sException(msg
)