2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
29 from uuid
import uuid4
31 from n2vc
.exceptions
import K8sException
32 from n2vc
.k8s_conn
import K8sConnector
36 class K8sHelmConnector(K8sConnector
):
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
48 kubectl_command
: str = "/usr/bin/kubectl",
49 helm_command
: str = "/usr/bin/helm",
55 :param fs: file system for kubernetes and helm configuration
56 :param db: database object to write current operation status
57 :param kubectl_command: path to kubectl executable
58 :param helm_command: path to helm executable
60 :param on_update_db: callback called when k8s connector updates database
64 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
66 self
.log
.info("Initializing K8S Helm connector")
68 # random numbers for release name generation
69 random
.seed(time
.time())
74 # exception if kubectl is not installed
75 self
.kubectl_command
= kubectl_command
76 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
78 # exception if helm is not installed
79 self
._helm
_command
= helm_command
80 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
82 # initialize helm client-only
83 self
.log
.debug("Initializing helm client-only...")
84 command
= "{} init --client-only".format(self
._helm
_command
)
86 asyncio
.ensure_future(
87 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
89 # loop = asyncio.get_event_loop()
90 # loop.run_until_complete(self._local_async_exec(command=command,
91 # raise_exception_on_error=False))
92 except Exception as e
:
94 msg
="helm init failed (it was already initialized): {}".format(e
)
97 self
.log
.info("K8S Helm connector initialized")
100 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
103 It prepares a given K8s cluster environment to run Charts on both sides:
107 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109 :param namespace: optional namespace to be used for helm. By default,
110 'kube-system' will be used
111 :param reuse_cluster_uuid: existing cluster uuid for reuse
112 :return: uuid of the K8s cluster and True if connector has installed some
113 software in the cluster
114 (on error, an exception will be raised)
117 cluster_uuid
= reuse_cluster_uuid
119 cluster_uuid
= str(uuid4())
121 self
.log
.debug("Initializing K8S environment. namespace: {}".format(namespace
))
123 # create config filename
124 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
125 cluster_name
=cluster_uuid
, create_if_not_exist
=True
127 f
= open(config_filename
, "w")
131 # check if tiller pod is up in cluster
132 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
133 self
.kubectl_command
, config_filename
, namespace
135 output
, _rc
= await self
._local
_async
_exec
(
136 command
=command
, raise_exception_on_error
=True
139 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
141 # find 'tiller' pod in all pods
142 already_initialized
= False
144 for row
in output_table
:
145 if row
[0].startswith("tiller-deploy"):
146 already_initialized
= True
152 n2vc_installed_sw
= False
153 if not already_initialized
:
155 "Initializing helm in client and server: {}".format(cluster_uuid
)
157 command
= "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
158 self
._helm
_command
, config_filename
, namespace
, helm_dir
160 output
, _rc
= await self
._local
_async
_exec
(
161 command
=command
, raise_exception_on_error
=True
163 n2vc_installed_sw
= True
165 # check client helm installation
166 check_file
= helm_dir
+ "/repository/repositories.yaml"
167 if not self
._check
_file
_exists
(
168 filename
=check_file
, exception_if_not_exists
=False
170 self
.log
.info("Initializing helm in client: {}".format(cluster_uuid
))
172 "{} --kubeconfig={} --tiller-namespace={} "
173 "--home={} init --client-only"
174 ).format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
175 output
, _rc
= await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=True
179 self
.log
.info("Helm client already initialized")
181 self
.log
.info("Cluster initialized {}".format(cluster_uuid
))
183 return cluster_uuid
, n2vc_installed_sw
186 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
189 self
.log
.debug("adding {} repository {}. URL: {}".format(repo_type
, name
, url
))
192 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
193 cluster_name
=cluster_uuid
, create_if_not_exist
=True
197 command
= "{} --kubeconfig={} --home={} repo update".format(
198 self
._helm
_command
, config_filename
, helm_dir
200 self
.log
.debug("updating repo: {}".format(command
))
201 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
203 # helm repo add name url
204 command
= "{} --kubeconfig={} --home={} repo add {} {}".format(
205 self
._helm
_command
, config_filename
, helm_dir
, name
, url
207 self
.log
.debug("adding repo: {}".format(command
))
208 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
210 async def repo_list(self
, cluster_uuid
: str) -> list:
212 Get the list of registered repositories
214 :return: list of registered repositories: [ (name, url) .... ]
217 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
220 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
221 cluster_name
=cluster_uuid
, create_if_not_exist
=True
224 command
= "{} --kubeconfig={} --home={} repo list --output yaml".format(
225 self
._helm
_command
, config_filename
, helm_dir
228 output
, _rc
= await self
._local
_async
_exec
(
229 command
=command
, raise_exception_on_error
=True
231 if output
and len(output
) > 0:
232 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
236 async def repo_remove(self
, cluster_uuid
: str, name
: str):
238 Remove a repository from OSM
240 :param cluster_uuid: the cluster
241 :param name: repo name in OSM
242 :return: True if successful
245 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
248 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
249 cluster_name
=cluster_uuid
, create_if_not_exist
=True
252 command
= "{} --kubeconfig={} --home={} repo remove {}".format(
253 self
._helm
_command
, config_filename
, helm_dir
, name
256 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
259 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
263 "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid
)
266 # get kube and helm directories
267 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
268 cluster_name
=cluster_uuid
, create_if_not_exist
=False
271 # uninstall releases if needed
272 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
273 if len(releases
) > 0:
277 kdu_instance
= r
.get("Name")
278 chart
= r
.get("Chart")
280 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
282 await self
.uninstall(
283 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
285 except Exception as e
:
287 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
291 "Cluster has releases and not force. Cannot reset K8s "
292 "environment. Cluster uuid: {}"
293 ).format(cluster_uuid
)
295 raise K8sException(msg
)
299 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_uuid
))
301 # find namespace for tiller pod
302 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
303 self
.kubectl_command
, config_filename
305 output
, _rc
= await self
._local
_async
_exec
(
306 command
=command
, raise_exception_on_error
=False
308 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
310 for r
in output_table
:
312 if "tiller-deploy" in r
[1]:
318 msg
= "Tiller deployment not found in cluster {}".format(cluster_uuid
)
321 self
.log
.debug("namespace for tiller: {}".format(namespace
))
323 force_str
= "--force"
326 # delete tiller deployment
328 "Deleting tiller deployment for cluster {}, namespace {}".format(
329 cluster_uuid
, namespace
333 "{} --namespace {} --kubeconfig={} {} delete deployment "
335 ).format(self
.kubectl_command
, namespace
, config_filename
, force_str
)
336 await self
._local
_async
_exec
(
337 command
=command
, raise_exception_on_error
=False
340 # uninstall tiller from cluster
342 "Uninstalling tiller from cluster {}".format(cluster_uuid
)
344 command
= "{} --kubeconfig={} --home={} reset".format(
345 self
._helm
_command
, config_filename
, helm_dir
347 self
.log
.debug("resetting: {}".format(command
))
348 output
, _rc
= await self
._local
_async
_exec
(
349 command
=command
, raise_exception_on_error
=True
352 self
.log
.debug("namespace not found")
354 # delete cluster directory
355 direct
= self
.fs
.path
+ "/" + cluster_uuid
356 self
.log
.debug("Removing directory {}".format(direct
))
357 shutil
.rmtree(direct
, ignore_errors
=True)
366 timeout
: float = 300,
368 db_dict
: dict = None,
369 kdu_name
: str = None,
370 namespace
: str = None,
373 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
376 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
377 cluster_name
=cluster_uuid
, create_if_not_exist
=True
381 # params_str = K8sHelmConnector._params_to_set_option(params)
382 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
383 cluster_uuid
=cluster_uuid
, params
=params
388 timeout_str
= "--timeout {}".format(timeout
)
393 atomic_str
= "--atomic"
397 namespace_str
= "--namespace {}".format(namespace
)
402 parts
= kdu_model
.split(sep
=":")
404 version_str
= "--version {}".format(parts
[1])
407 # generate a name for the release. Then, check if already exists
409 while kdu_instance
is None:
410 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
412 result
= await self
._status
_kdu
(
413 cluster_uuid
=cluster_uuid
,
414 kdu_instance
=kdu_instance
,
415 show_error_log
=False,
417 if result
is not None:
418 # instance already exists: generate a new one
425 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
426 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
427 helm
=self
._helm
_command
,
429 config
=config_filename
,
439 self
.log
.debug("installing: {}".format(command
))
442 # exec helm in a task
443 exec_task
= asyncio
.ensure_future(
444 coro_or_future
=self
._local
_async
_exec
(
445 command
=command
, raise_exception_on_error
=False
449 # write status in another task
450 status_task
= asyncio
.ensure_future(
451 coro_or_future
=self
._store
_status
(
452 cluster_uuid
=cluster_uuid
,
453 kdu_instance
=kdu_instance
,
460 # wait for execution task
461 await asyncio
.wait([exec_task
])
466 output
, rc
= exec_task
.result()
470 output
, rc
= await self
._local
_async
_exec
(
471 command
=command
, raise_exception_on_error
=False
474 # remove temporal values yaml file
476 os
.remove(file_to_delete
)
479 await self
._store
_status
(
480 cluster_uuid
=cluster_uuid
,
481 kdu_instance
=kdu_instance
,
489 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
491 raise K8sException(msg
)
493 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
496 async def instances_list(self
, cluster_uuid
: str) -> list:
498 returns a list of deployed releases in a cluster
500 :param cluster_uuid: the cluster
504 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
507 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
508 cluster_name
=cluster_uuid
, create_if_not_exist
=True
511 command
= "{} --kubeconfig={} --home={} list --output yaml".format(
512 self
._helm
_command
, config_filename
, helm_dir
515 output
, _rc
= await self
._local
_async
_exec
(
516 command
=command
, raise_exception_on_error
=True
519 if output
and len(output
) > 0:
520 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
528 kdu_model
: str = None,
530 timeout
: float = 300,
532 db_dict
: dict = None,
535 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
538 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
539 cluster_name
=cluster_uuid
, create_if_not_exist
=True
543 # params_str = K8sHelmConnector._params_to_set_option(params)
544 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
545 cluster_uuid
=cluster_uuid
, params
=params
550 timeout_str
= "--timeout {}".format(timeout
)
555 atomic_str
= "--atomic"
559 if kdu_model
and ":" in kdu_model
:
560 parts
= kdu_model
.split(sep
=":")
562 version_str
= "--version {}".format(parts
[1])
567 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
579 self
.log
.debug("upgrading: {}".format(command
))
583 # exec helm in a task
584 exec_task
= asyncio
.ensure_future(
585 coro_or_future
=self
._local
_async
_exec
(
586 command
=command
, raise_exception_on_error
=False
589 # write status in another task
590 status_task
= asyncio
.ensure_future(
591 coro_or_future
=self
._store
_status
(
592 cluster_uuid
=cluster_uuid
,
593 kdu_instance
=kdu_instance
,
600 # wait for execution task
601 await asyncio
.wait([exec_task
])
605 output
, rc
= exec_task
.result()
609 output
, rc
= await self
._local
_async
_exec
(
610 command
=command
, raise_exception_on_error
=False
613 # remove temporal values yaml file
615 os
.remove(file_to_delete
)
618 await self
._store
_status
(
619 cluster_uuid
=cluster_uuid
,
620 kdu_instance
=kdu_instance
,
628 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
630 raise K8sException(msg
)
632 # return new revision number
633 instance
= await self
.get_instance_info(
634 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
637 revision
= int(instance
.get("Revision"))
638 self
.log
.debug("New revision: {}".format(revision
))
644 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
648 "rollback kdu_instance {} to revision {} from cluster {}".format(
649 kdu_instance
, revision
, cluster_uuid
654 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
655 cluster_name
=cluster_uuid
, create_if_not_exist
=True
658 command
= "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
659 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
662 # exec helm in a task
663 exec_task
= asyncio
.ensure_future(
664 coro_or_future
=self
._local
_async
_exec
(
665 command
=command
, raise_exception_on_error
=False
668 # write status in another task
669 status_task
= asyncio
.ensure_future(
670 coro_or_future
=self
._store
_status
(
671 cluster_uuid
=cluster_uuid
,
672 kdu_instance
=kdu_instance
,
674 operation
="rollback",
679 # wait for execution task
680 await asyncio
.wait([exec_task
])
685 output
, rc
= exec_task
.result()
688 await self
._store
_status
(
689 cluster_uuid
=cluster_uuid
,
690 kdu_instance
=kdu_instance
,
692 operation
="rollback",
698 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
700 raise K8sException(msg
)
702 # return new revision number
703 instance
= await self
.get_instance_info(
704 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
707 revision
= int(instance
.get("Revision"))
708 self
.log
.debug("New revision: {}".format(revision
))
713 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
715 Removes an existing KDU instance. It would implicitly use the `delete` call
716 (this call would happen after all _terminate-config-primitive_ of the VNF
719 :param cluster_uuid: UUID of a K8s cluster known by OSM
720 :param kdu_instance: unique name for the KDU instance to be deleted
721 :return: True if successful
725 "uninstall kdu_instance {} from cluster {}".format(
726 kdu_instance
, cluster_uuid
731 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
732 cluster_name
=cluster_uuid
, create_if_not_exist
=True
735 command
= "{} --kubeconfig={} --home={} delete --purge {}".format(
736 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
739 output
, _rc
= await self
._local
_async
_exec
(
740 command
=command
, raise_exception_on_error
=True
743 return self
._output
_to
_table
(output
)
745 async def exec_primitive(
747 cluster_uuid
: str = None,
748 kdu_instance
: str = None,
749 primitive_name
: str = None,
750 timeout
: float = 300,
752 db_dict
: dict = None,
754 """Exec primitive (Juju action)
756 :param cluster_uuid str: The UUID of the cluster
757 :param kdu_instance str: The unique name of the KDU instance
758 :param primitive_name: Name of action that will be executed
759 :param timeout: Timeout for action execution
760 :param params: Dictionary of all the parameters needed for the action
761 :db_dict: Dictionary for any additional data
763 :return: Returns the output of the action
766 "KDUs deployed with Helm don't support actions "
767 "different from rollback, upgrade and status"
770 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
773 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
776 return await self
._exec
_inspect
_comand
(
777 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
780 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
783 "inspect kdu_model values {} from (optional) repo: {}".format(
788 return await self
._exec
_inspect
_comand
(
789 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
792 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
795 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
798 return await self
._exec
_inspect
_comand
(
799 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
802 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
804 # call internal function
805 return await self
._status
_kdu
(
806 cluster_uuid
=cluster_uuid
,
807 kdu_instance
=kdu_instance
,
812 async def synchronize_repos(self
, cluster_uuid
: str):
814 self
.log
.debug("syncronize repos for cluster helm-id: {}",)
816 update_repos_timeout
= (
817 300 # max timeout to sync a single repos, more than this is too much
819 db_k8scluster
= self
.db
.get_one(
820 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid
}
824 db_k8scluster
.get("_admin").get("helm_chart_repos") or []
826 cluster_repo_dict
= (
827 db_k8scluster
.get("_admin").get("helm_charts_added") or {}
829 # elements that must be deleted
830 deleted_repo_list
= []
832 self
.log
.debug("helm_chart_repos: {}".format(nbi_repo_list
))
833 self
.log
.debug("helm_charts_added: {}".format(cluster_repo_dict
))
835 # obtain repos to add: registered by nbi but not added
837 repo
for repo
in nbi_repo_list
if not cluster_repo_dict
.get(repo
)
840 # obtain repos to delete: added by cluster but not in nbi list
843 for repo
in cluster_repo_dict
.keys()
844 if repo
not in nbi_repo_list
847 # delete repos: must delete first then add because there may be
848 # different repos with same name but
849 # different id and url
850 self
.log
.debug("repos to delete: {}".format(repos_to_delete
))
851 for repo_id
in repos_to_delete
:
852 # try to delete repos
854 repo_delete_task
= asyncio
.ensure_future(
856 cluster_uuid
=cluster_uuid
,
857 name
=cluster_repo_dict
[repo_id
],
860 await asyncio
.wait_for(repo_delete_task
, update_repos_timeout
)
861 except Exception as e
:
863 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
864 repo_id
, cluster_repo_dict
[repo_id
], str(e
)
867 # always add to the list of to_delete if there is an error
868 # because if is not there
869 # deleting raises error
870 deleted_repo_list
.append(repo_id
)
873 self
.log
.debug("repos to add: {}".format(repos_to_add
))
874 for repo_id
in repos_to_add
:
875 # obtain the repo data from the db
876 # if there is an error getting the repo in the database we will
877 # ignore this repo and continue
878 # because there is a possible race condition where the repo has
879 # been deleted while processing
880 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
882 "obtained repo: id, {}, name: {}, url: {}".format(
883 repo_id
, db_repo
["name"], db_repo
["url"]
887 repo_add_task
= asyncio
.ensure_future(
889 cluster_uuid
=cluster_uuid
,
890 name
=db_repo
["name"],
895 await asyncio
.wait_for(repo_add_task
, update_repos_timeout
)
896 added_repo_dict
[repo_id
] = db_repo
["name"]
898 "added repo: id, {}, name: {}".format(
899 repo_id
, db_repo
["name"]
902 except Exception as e
:
903 # deal with error adding repo, adding a repo that already
904 # exists does not raise any error
905 # will not raise error because a wrong repos added by
906 # anyone could prevent instantiating any ns
908 "Error adding repo id: {}, err_msg: {} ".format(
913 return deleted_repo_list
, added_repo_dict
915 else: # else db_k8scluster does not exist
917 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
920 except Exception as e
:
921 self
.log
.error("Error synchronizing repos: {}".format(str(e
)))
922 raise K8sException("Error synchronizing repos")
925 ####################################################################################
926 ################################### P R I V A T E ##################################
927 ####################################################################################
930 async def _exec_inspect_comand(
931 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
936 repo_str
= " --repo {}".format(repo_url
)
937 idx
= kdu_model
.find("/")
940 kdu_model
= kdu_model
[idx
:]
942 inspect_command
= "{} inspect {} {}{}".format(
943 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
945 output
, _rc
= await self
._local
_async
_exec
(
946 command
=inspect_command
, encode_utf8
=True
951 async def _status_kdu(
955 show_error_log
: bool = False,
956 return_text
: bool = False,
959 self
.log
.debug("status of kdu_instance {}".format(kdu_instance
))
962 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
963 cluster_name
=cluster_uuid
, create_if_not_exist
=True
966 command
= "{} --kubeconfig={} --home={} status {} --output yaml".format(
967 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
970 output
, rc
= await self
._local
_async
_exec
(
972 raise_exception_on_error
=True,
973 show_error_log
=show_error_log
,
982 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
984 # remove field 'notes'
986 del data
.get("info").get("status")["notes"]
990 # parse field 'resources'
992 resources
= str(data
.get("info").get("status").get("resources"))
993 resource_table
= self
._output
_to
_table
(resources
)
994 data
.get("info").get("status")["resources"] = resource_table
1000 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
1001 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
1002 for instance
in instances
:
1003 if instance
.get("Name") == kdu_instance
:
1005 self
.log
.debug("Instance {} not found".format(kdu_instance
))
1009 def _generate_release_name(chart_name
: str):
1010 # check embeded chart (file or dir)
1011 if chart_name
.startswith("/"):
1012 # extract file or directory name
1013 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1015 elif "://" in chart_name
:
1016 # extract last portion of URL
1017 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1020 for c
in chart_name
:
1021 if c
.isalpha() or c
.isnumeric():
1028 # if does not start with alpha character, prefix 'a'
1029 if not name
[0].isalpha():
1034 def get_random_number():
1035 r
= random
.randrange(start
=1, stop
=99999999)
1037 s
= s
.rjust(10, "0")
1040 name
= name
+ get_random_number()
1043 async def _store_status(
1048 check_every
: float = 10,
1049 db_dict
: dict = None,
1050 run_once
: bool = False,
1054 await asyncio
.sleep(check_every
)
1055 detailed_status
= await self
.status_kdu(
1056 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
1058 status
= detailed_status
.get("info").get("Description")
1059 self
.log
.debug("STATUS:\n{}".format(status
))
1060 self
.log
.debug("DETAILED STATUS:\n{}".format(detailed_status
))
1061 # write status to db
1062 result
= await self
.write_app_status_to_db(
1065 detailed_status
=str(detailed_status
),
1066 operation
=operation
,
1069 self
.log
.info("Error writing in database. Task exiting...")
1071 except asyncio
.CancelledError
:
1072 self
.log
.debug("Task cancelled")
1074 except Exception as e
:
1075 self
.log
.debug("_store_status exception: {}".format(str(e
)))
1081 async def _is_install_completed(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
1083 status
= await self
._status
_kdu
(
1084 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
, return_text
=False
1087 # extract info.status.resources-> str
1090 # NAME READY UP-TO-DATE AVAILABLE AGE
1091 # halting-horse-mongodb 0/1 1 0 0s
1092 # halting-petit-mongodb 1/1 1 0 0s
1094 resources
= K8sHelmConnector
._get
_deep
(status
, ("info", "status", "resources"))
1097 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
1099 num_lines
= len(resources
)
1101 while index
< num_lines
:
1103 line1
= resources
[index
]
1105 # find '==>' in column 0
1106 if line1
[0] == "==>":
1107 line2
= resources
[index
]
1109 # find READY in column 1
1110 if line2
[1] == "READY":
1112 line3
= resources
[index
]
1114 while len(line3
) > 1 and index
< num_lines
:
1115 ready_value
= line3
[1]
1116 parts
= ready_value
.split(sep
="/")
1117 current
= int(parts
[0])
1118 total
= int(parts
[1])
1120 self
.log
.debug("NOT READY:\n {}".format(line3
))
1122 line3
= resources
[index
]
1131 def _get_deep(dictionary
: dict, members
: tuple):
1136 value
= target
.get(m
)
1145 # find key:value in several lines
1147 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1148 for line
in p_lines
:
1150 if line
.startswith(p_key
+ ":"):
1151 parts
= line
.split(":")
1152 the_value
= parts
[1].strip()
1159 # params for use in -f file
1160 # returns values file option and filename (in order to delete it at the end)
1161 def _params_to_file_option(self
, cluster_uuid
: str, params
: dict) -> (str, str):
1163 if params
and len(params
) > 0:
1164 self
._get
_paths
(cluster_name
=cluster_uuid
, create_if_not_exist
=True)
1166 def get_random_number():
1167 r
= random
.randrange(start
=1, stop
=99999999)
1175 value
= params
.get(key
)
1176 if "!!yaml" in str(value
):
1177 value
= yaml
.load(value
[7:])
1178 params2
[key
] = value
1180 values_file
= get_random_number() + ".yaml"
1181 with
open(values_file
, "w") as stream
:
1182 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1184 return "-f {}".format(values_file
), values_file
1188 # params for use in --set option
1190 def _params_to_set_option(params
: dict) -> str:
1192 if params
and len(params
) > 0:
1195 value
= params
.get(key
, None)
1196 if value
is not None:
1198 params_str
+= "--set "
1202 params_str
+= "{}={}".format(key
, value
)
1206 def _output_to_lines(output
: str) -> list:
1207 output_lines
= list()
1208 lines
= output
.splitlines(keepends
=False)
1212 output_lines
.append(line
)
1216 def _output_to_table(output
: str) -> list:
1217 output_table
= list()
1218 lines
= output
.splitlines(keepends
=False)
1220 line
= line
.replace("\t", " ")
1222 output_table
.append(line_list
)
1223 cells
= line
.split(sep
=" ")
1227 line_list
.append(cell
)
1231 self
, cluster_name
: str, create_if_not_exist
: bool = False
1232 ) -> (str, str, str, str):
1234 Returns kube and helm directories
1236 :param cluster_name:
1237 :param create_if_not_exist:
1238 :return: kube, helm directories, config filename and cluster dir.
1239 Raises exception if not exist and cannot create
1243 if base
.endswith("/") or base
.endswith("\\"):
1246 # base dir for cluster
1247 cluster_dir
= base
+ "/" + cluster_name
1248 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
1249 self
.log
.debug("Creating dir {}".format(cluster_dir
))
1250 os
.makedirs(cluster_dir
)
1251 if not os
.path
.exists(cluster_dir
):
1252 msg
= "Base cluster dir {} does not exist".format(cluster_dir
)
1254 raise K8sException(msg
)
1257 kube_dir
= cluster_dir
+ "/" + ".kube"
1258 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
1259 self
.log
.debug("Creating dir {}".format(kube_dir
))
1260 os
.makedirs(kube_dir
)
1261 if not os
.path
.exists(kube_dir
):
1262 msg
= "Kube config dir {} does not exist".format(kube_dir
)
1264 raise K8sException(msg
)
1267 helm_dir
= cluster_dir
+ "/" + ".helm"
1268 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
1269 self
.log
.debug("Creating dir {}".format(helm_dir
))
1270 os
.makedirs(helm_dir
)
1271 if not os
.path
.exists(helm_dir
):
1272 msg
= "Helm config dir {} does not exist".format(helm_dir
)
1274 raise K8sException(msg
)
1276 config_filename
= kube_dir
+ "/config"
1277 return kube_dir
, helm_dir
, config_filename
, cluster_dir
1280 def _remove_multiple_spaces(strobj
):
1281 strobj
= strobj
.strip()
1282 while " " in strobj
:
1283 strobj
= strobj
.replace(" ", " ")
1286 def _local_exec(self
, command
: str) -> (str, int):
1287 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1288 self
.log
.debug("Executing sync local command: {}".format(command
))
1289 # raise exception if fails
1292 output
= subprocess
.check_output(
1293 command
, shell
=True, universal_newlines
=True
1296 self
.log
.debug(output
)
1300 return output
, return_code
1302 async def _local_async_exec(
1305 raise_exception_on_error
: bool = False,
1306 show_error_log
: bool = True,
1307 encode_utf8
: bool = False,
1310 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1311 self
.log
.debug("Executing async local command: {}".format(command
))
1314 command
= command
.split(sep
=" ")
1317 process
= await asyncio
.create_subprocess_exec(
1318 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
1321 # wait for command terminate
1322 stdout
, stderr
= await process
.communicate()
1324 return_code
= process
.returncode
1328 output
= stdout
.decode("utf-8").strip()
1329 # output = stdout.decode()
1331 output
= stderr
.decode("utf-8").strip()
1332 # output = stderr.decode()
1334 if return_code
!= 0 and show_error_log
:
1336 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1339 self
.log
.debug("Return code: {}".format(return_code
))
1341 if raise_exception_on_error
and return_code
!= 0:
1342 raise K8sException(output
)
1345 output
= output
.encode("utf-8").strip()
1346 output
= str(output
).replace("\\n", "\n")
1348 return output
, return_code
1350 except asyncio
.CancelledError
:
1352 except K8sException
:
1354 except Exception as e
:
1355 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1357 if raise_exception_on_error
:
1358 raise K8sException(e
) from e
1362 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1363 # self.log.debug('Checking if file {} exists...'.format(filename))
1364 if os
.path
.exists(filename
):
1367 msg
= "File {} does not exist".format(filename
)
1368 if exception_if_not_exists
:
1369 # self.log.error(msg)
1370 raise K8sException(msg
)