9a0908f39341650f133f31006da179c2b6ad98a2
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
29 from uuid
import uuid4
31 from n2vc
.exceptions
import K8sException
32 from n2vc
.k8s_conn
import K8sConnector
36 class K8sHelmConnector(K8sConnector
):
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
43 service_account
= "osm"
44 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
50 kubectl_command
: str = "/usr/bin/kubectl",
51 helm_command
: str = "/usr/bin/helm",
54 vca_config
: dict = None,
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param on_update_db: callback called when k8s connector updates database
67 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
69 self
.log
.info("Initializing K8S Helm connector")
71 # random numbers for release name generation
72 random
.seed(time
.time())
77 # exception if kubectl is not installed
78 self
.kubectl_command
= kubectl_command
79 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
81 # exception if helm is not installed
82 self
._helm
_command
= helm_command
83 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
85 # obtain stable repo url from config or apply default
86 if not vca_config
or not vca_config
.get("stablerepourl"):
87 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
89 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
91 # initialize helm client-only
92 self
.log
.debug("Initializing helm client-only...")
93 command
= "{} init --client-only --stable-repo-url {}".format(
94 self
._helm
_command
, self
._stable
_repo
_url
)
96 asyncio
.ensure_future(
97 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
99 # loop = asyncio.get_event_loop()
100 # loop.run_until_complete(self._local_async_exec(command=command,
101 # raise_exception_on_error=False))
102 except Exception as e
:
104 msg
="helm init failed (it was already initialized): {}".format(e
)
107 self
.log
.info("K8S Helm connector initialized")
110 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
112 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
113 cluster_id for backward compatibility
115 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
116 return namespace
, cluster_id
119 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
122 It prepares a given K8s cluster environment to run Charts on both sides:
126 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
128 :param namespace: optional namespace to be used for helm. By default,
129 'kube-system' will be used
130 :param reuse_cluster_uuid: existing cluster uuid for reuse
131 :return: uuid of the K8s cluster and True if connector has installed some
132 software in the cluster
133 (on error, an exception will be raised)
136 if reuse_cluster_uuid
:
137 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
138 namespace
= namespace_
or namespace
140 cluster_id
= str(uuid4())
141 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
143 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
145 # create config filename
146 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
147 cluster_name
=cluster_id
, create_if_not_exist
=True
149 with
open(config_filename
, "w") as f
:
152 # check if tiller pod is up in cluster
153 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
154 self
.kubectl_command
, config_filename
, namespace
156 output
, _rc
= await self
._local
_async
_exec
(
157 command
=command
, raise_exception_on_error
=True
160 output_table
= self
._output
_to
_table
(output
=output
)
162 # find 'tiller' pod in all pods
163 already_initialized
= False
165 for row
in output_table
:
166 if row
[0].startswith("tiller-deploy"):
167 already_initialized
= True
173 n2vc_installed_sw
= False
174 if not already_initialized
:
176 "Initializing helm in client and server: {}".format(cluster_id
)
178 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
179 self
.kubectl_command
, config_filename
, self
.service_account
)
180 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
182 command
= ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
183 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
184 ).format(self
.kubectl_command
, config_filename
, self
.service_account
)
185 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
187 command
= ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
188 " --stable-repo-url {} init").format(self
._helm
_command
,
191 self
.service_account
,
192 self
._stable
_repo
_url
)
193 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
194 n2vc_installed_sw
= True
196 # check client helm installation
197 check_file
= helm_dir
+ "/repository/repositories.yaml"
198 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
199 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
201 "{} --kubeconfig={} --tiller-namespace={} "
202 "--home={} init --client-only --stable-repo-url {} "
203 ).format(self
._helm
_command
, config_filename
, namespace
,
204 helm_dir
, self
._stable
_repo
_url
)
205 output
, _rc
= await self
._local
_async
_exec
(
206 command
=command
, raise_exception_on_error
=True
209 self
.log
.info("Helm client already initialized")
211 # remove old stable repo and add new one
212 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
213 repo_list
= await self
.repo_list(cluster_uuid
)
214 for repo
in repo_list
:
215 if repo
["Name"] == "stable" and repo
["URL"] != self
._stable
_repo
_url
:
216 self
.log
.debug("Add new stable repo url: {}")
217 await self
.repo_remove(cluster_uuid
,
219 await self
.repo_add(cluster_uuid
,
221 self
._stable
_repo
_url
)
224 self
.log
.info("Cluster {} initialized".format(cluster_id
))
226 return cluster_uuid
, n2vc_installed_sw
229 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
231 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
232 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
233 cluster_id
, repo_type
, name
, url
))
236 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
237 cluster_name
=cluster_id
, create_if_not_exist
=True
241 command
= "{} --kubeconfig={} --home={} repo update".format(
242 self
._helm
_command
, config_filename
, helm_dir
244 self
.log
.debug("updating repo: {}".format(command
))
245 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
247 # helm repo add name url
248 command
= "{} --kubeconfig={} --home={} repo add {} {}".format(
249 self
._helm
_command
, config_filename
, helm_dir
, name
, url
251 self
.log
.debug("adding repo: {}".format(command
))
252 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
254 async def repo_list(self
, cluster_uuid
: str) -> list:
256 Get the list of registered repositories
258 :return: list of registered repositories: [ (name, url) .... ]
261 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
262 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
265 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
266 cluster_name
=cluster_id
, create_if_not_exist
=True
269 command
= "{} --kubeconfig={} --home={} repo list --output yaml".format(
270 self
._helm
_command
, config_filename
, helm_dir
273 output
, _rc
= await self
._local
_async
_exec
(
274 command
=command
, raise_exception_on_error
=True
276 if output
and len(output
) > 0:
277 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
281 async def repo_remove(self
, cluster_uuid
: str, name
: str):
283 Remove a repository from OSM
285 :param cluster_uuid: the cluster or 'namespace:cluster'
286 :param name: repo name in OSM
287 :return: True if successful
290 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
291 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
294 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
295 cluster_name
=cluster_id
, create_if_not_exist
=True
298 command
= "{} --kubeconfig={} --home={} repo remove {}".format(
299 self
._helm
_command
, config_filename
, helm_dir
, name
302 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
305 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
308 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
309 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
310 .format(cluster_id
, uninstall_sw
))
312 # get kube and helm directories
313 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
314 cluster_name
=cluster_id
, create_if_not_exist
=False
317 # uninstall releases if needed.
319 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
320 if len(releases
) > 0:
324 kdu_instance
= r
.get("Name")
325 chart
= r
.get("Chart")
327 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
329 await self
.uninstall(
330 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
332 except Exception as e
:
334 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
338 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
341 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
345 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
348 # find namespace for tiller pod
349 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
350 self
.kubectl_command
, config_filename
352 output
, _rc
= await self
._local
_async
_exec
(
353 command
=command
, raise_exception_on_error
=False
355 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
357 for r
in output_table
:
359 if "tiller-deploy" in r
[1]:
365 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
368 self
.log
.debug("namespace for tiller: {}".format(namespace
))
371 # uninstall tiller from cluster
373 "Uninstalling tiller from cluster {}".format(cluster_id
)
375 command
= "{} --kubeconfig={} --home={} reset".format(
376 self
._helm
_command
, config_filename
, helm_dir
378 self
.log
.debug("resetting: {}".format(command
))
379 output
, _rc
= await self
._local
_async
_exec
(
380 command
=command
, raise_exception_on_error
=True
382 # Delete clusterrolebinding and serviceaccount.
383 # Ignore if errors for backward compatibility
384 command
= ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
385 "io/osm-tiller-cluster-rule").format(self
.kubectl_command
,
387 output
, _rc
= await self
._local
_async
_exec
(command
=command
,
388 raise_exception_on_error
=False)
389 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
390 format(self
.kubectl_command
, config_filename
, self
.service_account
)
391 output
, _rc
= await self
._local
_async
_exec
(command
=command
,
392 raise_exception_on_error
=False)
395 self
.log
.debug("namespace not found")
397 # delete cluster directory
398 direct
= self
.fs
.path
+ "/" + cluster_id
399 self
.log
.debug("Removing directory {}".format(direct
))
400 shutil
.rmtree(direct
, ignore_errors
=True)
409 timeout
: float = 300,
411 db_dict
: dict = None,
412 kdu_name
: str = None,
413 namespace
: str = None,
416 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
417 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
420 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
421 cluster_name
=cluster_id
, create_if_not_exist
=True
425 # params_str = K8sHelmConnector._params_to_set_option(params)
426 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
427 cluster_id
=cluster_id
, params
=params
432 timeout_str
= "--timeout {}".format(timeout
)
437 atomic_str
= "--atomic"
441 namespace_str
= "--namespace {}".format(namespace
)
446 parts
= kdu_model
.split(sep
=":")
448 version_str
= "--version {}".format(parts
[1])
451 # generate a name for the release. Then, check if already exists
453 while kdu_instance
is None:
454 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
456 result
= await self
._status
_kdu
(
457 cluster_id
=cluster_id
,
458 kdu_instance
=kdu_instance
,
459 show_error_log
=False,
461 if result
is not None:
462 # instance already exists: generate a new one
469 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
470 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
471 helm
=self
._helm
_command
,
473 config
=config_filename
,
483 self
.log
.debug("installing: {}".format(command
))
486 # exec helm in a task
487 exec_task
= asyncio
.ensure_future(
488 coro_or_future
=self
._local
_async
_exec
(
489 command
=command
, raise_exception_on_error
=False
493 # write status in another task
494 status_task
= asyncio
.ensure_future(
495 coro_or_future
=self
._store
_status
(
496 cluster_id
=cluster_id
,
497 kdu_instance
=kdu_instance
,
504 # wait for execution task
505 await asyncio
.wait([exec_task
])
510 output
, rc
= exec_task
.result()
514 output
, rc
= await self
._local
_async
_exec
(
515 command
=command
, raise_exception_on_error
=False
518 # remove temporal values yaml file
520 os
.remove(file_to_delete
)
523 await self
._store
_status
(
524 cluster_id
=cluster_id
,
525 kdu_instance
=kdu_instance
,
533 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
535 raise K8sException(msg
)
537 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
540 async def instances_list(self
, cluster_uuid
: str) -> list:
542 returns a list of deployed releases in a cluster
544 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
548 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
549 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
552 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
553 cluster_name
=cluster_id
, create_if_not_exist
=True
556 command
= "{} --kubeconfig={} --home={} list --output yaml".format(
557 self
._helm
_command
, config_filename
, helm_dir
560 output
, _rc
= await self
._local
_async
_exec
(
561 command
=command
, raise_exception_on_error
=True
564 if output
and len(output
) > 0:
565 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
573 kdu_model
: str = None,
575 timeout
: float = 300,
577 db_dict
: dict = None,
580 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
581 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
584 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
585 cluster_name
=cluster_id
, create_if_not_exist
=True
589 # params_str = K8sHelmConnector._params_to_set_option(params)
590 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
591 cluster_id
=cluster_id
, params
=params
596 timeout_str
= "--timeout {}".format(timeout
)
601 atomic_str
= "--atomic"
605 if kdu_model
and ":" in kdu_model
:
606 parts
= kdu_model
.split(sep
=":")
608 version_str
= "--version {}".format(parts
[1])
613 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
625 self
.log
.debug("upgrading: {}".format(command
))
629 # exec helm in a task
630 exec_task
= asyncio
.ensure_future(
631 coro_or_future
=self
._local
_async
_exec
(
632 command
=command
, raise_exception_on_error
=False
635 # write status in another task
636 status_task
= asyncio
.ensure_future(
637 coro_or_future
=self
._store
_status
(
638 cluster_id
=cluster_id
,
639 kdu_instance
=kdu_instance
,
646 # wait for execution task
647 await asyncio
.wait([exec_task
])
651 output
, rc
= exec_task
.result()
655 output
, rc
= await self
._local
_async
_exec
(
656 command
=command
, raise_exception_on_error
=False
659 # remove temporal values yaml file
661 os
.remove(file_to_delete
)
664 await self
._store
_status
(
665 cluster_id
=cluster_id
,
666 kdu_instance
=kdu_instance
,
674 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
676 raise K8sException(msg
)
678 # return new revision number
679 instance
= await self
.get_instance_info(
680 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
683 revision
= int(instance
.get("Revision"))
684 self
.log
.debug("New revision: {}".format(revision
))
690 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
693 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
695 "rollback kdu_instance {} to revision {} from cluster {}".format(
696 kdu_instance
, revision
, cluster_id
701 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
702 cluster_name
=cluster_id
, create_if_not_exist
=True
705 command
= "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
706 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
709 # exec helm in a task
710 exec_task
= asyncio
.ensure_future(
711 coro_or_future
=self
._local
_async
_exec
(
712 command
=command
, raise_exception_on_error
=False
715 # write status in another task
716 status_task
= asyncio
.ensure_future(
717 coro_or_future
=self
._store
_status
(
718 cluster_id
=cluster_id
,
719 kdu_instance
=kdu_instance
,
721 operation
="rollback",
726 # wait for execution task
727 await asyncio
.wait([exec_task
])
732 output
, rc
= exec_task
.result()
735 await self
._store
_status
(
736 cluster_id
=cluster_id
,
737 kdu_instance
=kdu_instance
,
739 operation
="rollback",
745 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
747 raise K8sException(msg
)
749 # return new revision number
750 instance
= await self
.get_instance_info(
751 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
754 revision
= int(instance
.get("Revision"))
755 self
.log
.debug("New revision: {}".format(revision
))
760 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
762 Removes an existing KDU instance. It would implicitly use the `delete` call
763 (this call would happen after all _terminate-config-primitive_ of the VNF
766 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
767 :param kdu_instance: unique name for the KDU instance to be deleted
768 :return: True if successful
771 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
773 "uninstall kdu_instance {} from cluster {}".format(
774 kdu_instance
, cluster_id
779 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
780 cluster_name
=cluster_id
, create_if_not_exist
=True
783 command
= "{} --kubeconfig={} --home={} delete --purge {}".format(
784 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
787 output
, _rc
= await self
._local
_async
_exec
(
788 command
=command
, raise_exception_on_error
=True
791 return self
._output
_to
_table
(output
)
793 async def exec_primitive(
795 cluster_uuid
: str = None,
796 kdu_instance
: str = None,
797 primitive_name
: str = None,
798 timeout
: float = 300,
800 db_dict
: dict = None,
802 """Exec primitive (Juju action)
804 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
805 :param kdu_instance str: The unique name of the KDU instance
806 :param primitive_name: Name of action that will be executed
807 :param timeout: Timeout for action execution
808 :param params: Dictionary of all the parameters needed for the action
809 :db_dict: Dictionary for any additional data
811 :return: Returns the output of the action
814 "KDUs deployed with Helm don't support actions "
815 "different from rollback, upgrade and status"
818 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
821 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
824 return await self
._exec
_inspect
_comand
(
825 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
828 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
831 "inspect kdu_model values {} from (optional) repo: {}".format(
836 return await self
._exec
_inspect
_comand
(
837 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
840 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
843 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
846 return await self
._exec
_inspect
_comand
(
847 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
850 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
852 # call internal function
853 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
854 return await self
._status
_kdu
(
855 cluster_id
=cluster_id
,
856 kdu_instance
=kdu_instance
,
861 async def get_services(self
,
864 namespace
: str) -> list:
866 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
868 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
869 cluster_uuid
, kdu_instance
873 status
= await self
._status
_kdu
(
874 cluster_id
, kdu_instance
, return_text
=False
877 service_names
= self
._parse
_helm
_status
_service
_info
(status
)
879 for service
in service_names
:
880 service
= await self
.get_service(cluster_uuid
, service
, namespace
)
881 service_list
.append(service
)
885 async def get_service(self
,
888 namespace
: str) -> object:
891 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
892 service_name
, namespace
, cluster_uuid
)
896 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
897 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
898 cluster_name
=cluster_id
, create_if_not_exist
=True
901 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
902 self
.kubectl_command
, config_filename
, namespace
, service_name
905 output
, _rc
= await self
._local
_async
_exec
(
906 command
=command
, raise_exception_on_error
=True
909 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
912 "name": service_name
,
913 "type": self
._get
_deep
(data
, ("spec", "type")),
914 "ports": self
._get
_deep
(data
, ("spec", "ports")),
915 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
917 if service
["type"] == "LoadBalancer":
918 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
919 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
920 service
["external_ip"] = ip_list
924 async def synchronize_repos(self
, cluster_uuid
: str):
926 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
927 self
.log
.debug("syncronize repos for cluster helm-id: {}".format(cluster_id
))
929 update_repos_timeout
= (
930 300 # max timeout to sync a single repos, more than this is too much
932 db_k8scluster
= self
.db
.get_one(
933 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid
}
937 db_k8scluster
.get("_admin").get("helm_chart_repos") or []
939 cluster_repo_dict
= (
940 db_k8scluster
.get("_admin").get("helm_charts_added") or {}
942 # elements that must be deleted
943 deleted_repo_list
= []
945 # self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
946 # self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
948 # obtain repos to add: registered by nbi but not added
950 repo
for repo
in nbi_repo_list
if not cluster_repo_dict
.get(repo
)
953 # obtain repos to delete: added by cluster but not in nbi list
956 for repo
in cluster_repo_dict
.keys()
957 if repo
not in nbi_repo_list
960 # delete repos: must delete first then add because there may be
961 # different repos with same name but
962 # different id and url
964 self
.log
.debug("repos to delete: {}".format(repos_to_delete
))
965 for repo_id
in repos_to_delete
:
966 # try to delete repos
968 repo_delete_task
= asyncio
.ensure_future(
970 cluster_uuid
=cluster_uuid
,
971 name
=cluster_repo_dict
[repo_id
],
974 await asyncio
.wait_for(repo_delete_task
, update_repos_timeout
)
975 except Exception as e
:
977 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
978 repo_id
, cluster_repo_dict
[repo_id
], str(e
)
981 # always add to the list of to_delete if there is an error
982 # because if is not there
983 # deleting raises error
984 deleted_repo_list
.append(repo_id
)
988 self
.log
.debug("repos to add: {}".format(repos_to_add
))
989 for repo_id
in repos_to_add
:
990 # obtain the repo data from the db
991 # if there is an error getting the repo in the database we will
992 # ignore this repo and continue
993 # because there is a possible race condition where the repo has
994 # been deleted while processing
995 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
997 "obtained repo: id, {}, name: {}, url: {}".format(
998 repo_id
, db_repo
["name"], db_repo
["url"]
1002 repo_add_task
= asyncio
.ensure_future(
1004 cluster_uuid
=cluster_uuid
,
1005 name
=db_repo
["name"],
1010 await asyncio
.wait_for(repo_add_task
, update_repos_timeout
)
1011 added_repo_dict
[repo_id
] = db_repo
["name"]
1013 "added repo: id, {}, name: {}".format(
1014 repo_id
, db_repo
["name"]
1017 except Exception as e
:
1018 # deal with error adding repo, adding a repo that already
1019 # exists does not raise any error
1020 # will not raise error because a wrong repos added by
1021 # anyone could prevent instantiating any ns
1023 "Error adding repo id: {}, err_msg: {} ".format(
1028 return deleted_repo_list
, added_repo_dict
1030 else: # else db_k8scluster does not exist
1032 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
1035 except Exception as e
:
1036 self
.log
.error("Error synchronizing repos: {}".format(str(e
)))
1037 raise K8sException("Error synchronizing repos")
1040 ####################################################################################
1041 ################################### P R I V A T E ##################################
1042 ####################################################################################
1045 async def _exec_inspect_comand(
1046 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1051 repo_str
= " --repo {}".format(repo_url
)
1052 idx
= kdu_model
.find("/")
1055 kdu_model
= kdu_model
[idx
:]
1057 inspect_command
= "{} inspect {} {}{}".format(
1058 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
1060 output
, _rc
= await self
._local
_async
_exec
(
1061 command
=inspect_command
, encode_utf8
=True
1066 async def _status_kdu(
1070 show_error_log
: bool = False,
1071 return_text
: bool = False,
1074 self
.log
.debug("status of kdu_instance {}".format(kdu_instance
))
1077 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
1078 cluster_name
=cluster_id
, create_if_not_exist
=True
1081 command
= "{} --kubeconfig={} --home={} status {} --output yaml".format(
1082 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
1085 output
, rc
= await self
._local
_async
_exec
(
1087 raise_exception_on_error
=True,
1088 show_error_log
=show_error_log
,
1097 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1099 # remove field 'notes'
1101 del data
.get("info").get("status")["notes"]
1105 # parse field 'resources'
1107 resources
= str(data
.get("info").get("status").get("resources"))
1108 resource_table
= self
._output
_to
_table
(resources
)
1109 data
.get("info").get("status")["resources"] = resource_table
1115 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
1116 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
1117 for instance
in instances
:
1118 if instance
.get("Name") == kdu_instance
:
1120 self
.log
.debug("Instance {} not found".format(kdu_instance
))
1124 def _generate_release_name(chart_name
: str):
1125 # check embeded chart (file or dir)
1126 if chart_name
.startswith("/"):
1127 # extract file or directory name
1128 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1130 elif "://" in chart_name
:
1131 # extract last portion of URL
1132 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1135 for c
in chart_name
:
1136 if c
.isalpha() or c
.isnumeric():
1143 # if does not start with alpha character, prefix 'a'
1144 if not name
[0].isalpha():
1149 def get_random_number():
1150 r
= random
.randrange(start
=1, stop
=99999999)
1152 s
= s
.rjust(10, "0")
1155 name
= name
+ get_random_number()
1158 async def _store_status(
1163 check_every
: float = 10,
1164 db_dict
: dict = None,
1165 run_once
: bool = False,
1167 previous_exception
= None
1170 await asyncio
.sleep(check_every
)
1171 detailed_status
= await self
._status
_kdu
(
1172 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
,
1175 status
= detailed_status
.get("info").get("Description")
1176 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1177 # write status to db
1178 result
= await self
.write_app_status_to_db(
1181 detailed_status
=str(detailed_status
),
1182 operation
=operation
,
1185 self
.log
.info("Error writing in database. Task exiting...")
1187 except asyncio
.CancelledError
:
1188 self
.log
.debug("Task cancelled")
1190 except Exception as e
:
1191 # log only once in the while loop
1192 if str(previous_exception
) != str(e
):
1193 self
.log
.debug("_store_status exception: {}".format(str(e
)))
1194 previous_exception
= e
1199 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
1201 status
= await self
._status
_kdu
(
1202 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
1205 # extract info.status.resources-> str
1208 # NAME READY UP-TO-DATE AVAILABLE AGE
1209 # halting-horse-mongodb 0/1 1 0 0s
1210 # halting-petit-mongodb 1/1 1 0 0s
1212 resources
= K8sHelmConnector
._get
_deep
(status
, ("info", "status", "resources"))
1215 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
1217 num_lines
= len(resources
)
1219 while index
< num_lines
:
1221 line1
= resources
[index
]
1223 # find '==>' in column 0
1224 if line1
[0] == "==>":
1225 line2
= resources
[index
]
1227 # find READY in column 1
1228 if line2
[1] == "READY":
1230 line3
= resources
[index
]
1232 while len(line3
) > 1 and index
< num_lines
:
1233 ready_value
= line3
[1]
1234 parts
= ready_value
.split(sep
="/")
1235 current
= int(parts
[0])
1236 total
= int(parts
[1])
1238 self
.log
.debug("NOT READY:\n {}".format(line3
))
1240 line3
= resources
[index
]
1248 def _parse_helm_status_service_info(self
, status
):
1250 # extract info.status.resources-> str
1253 # NAME READY UP-TO-DATE AVAILABLE AGE
1254 # halting-horse-mongodb 0/1 1 0 0s
1255 # halting-petit-mongodb 1/1 1 0 0s
1257 resources
= K8sHelmConnector
._get
_deep
(status
, ("info", "status", "resources"))
1260 first_line_skipped
= service_found
= False
1261 for line
in resources
:
1262 if not service_found
:
1263 if len(line
) >= 2 and line
[0] == "==>" and line
[1] == "v1/Service":
1264 service_found
= True
1267 if len(line
) >= 2 and line
[0] == "==>":
1268 service_found
= first_line_skipped
= False
1272 if not first_line_skipped
:
1273 first_line_skipped
= True
1275 service_list
.append(line
[0])
1280 def _get_deep(dictionary
: dict, members
: tuple):
1285 value
= target
.get(m
)
1294 # find key:value in several lines
1296 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1297 for line
in p_lines
:
1299 if line
.startswith(p_key
+ ":"):
1300 parts
= line
.split(":")
1301 the_value
= parts
[1].strip()
1308 # params for use in -f file
1309 # returns values file option and filename (in order to delete it at the end)
1310 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1312 if params
and len(params
) > 0:
1313 self
._get
_paths
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1315 def get_random_number():
1316 r
= random
.randrange(start
=1, stop
=99999999)
1324 value
= params
.get(key
)
1325 if "!!yaml" in str(value
):
1326 value
= yaml
.load(value
[7:])
1327 params2
[key
] = value
1329 values_file
= get_random_number() + ".yaml"
1330 with
open(values_file
, "w") as stream
:
1331 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1333 return "-f {}".format(values_file
), values_file
1337 # params for use in --set option
1339 def _params_to_set_option(params
: dict) -> str:
1341 if params
and len(params
) > 0:
1344 value
= params
.get(key
, None)
1345 if value
is not None:
1347 params_str
+= "--set "
1351 params_str
+= "{}={}".format(key
, value
)
1355 def _output_to_lines(output
: str) -> list:
1356 output_lines
= list()
1357 lines
= output
.splitlines(keepends
=False)
1361 output_lines
.append(line
)
1365 def _output_to_table(output
: str) -> list:
1366 output_table
= list()
1367 lines
= output
.splitlines(keepends
=False)
1369 line
= line
.replace("\t", " ")
1371 output_table
.append(line_list
)
1372 cells
= line
.split(sep
=" ")
1376 line_list
.append(cell
)
1380 self
, cluster_name
: str, create_if_not_exist
: bool = False
1381 ) -> (str, str, str, str):
1383 Returns kube and helm directories
1385 :param cluster_name:
1386 :param create_if_not_exist:
1387 :return: kube, helm directories, config filename and cluster dir.
1388 Raises exception if not exist and cannot create
1392 if base
.endswith("/") or base
.endswith("\\"):
1395 # base dir for cluster
1396 cluster_dir
= base
+ "/" + cluster_name
1397 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
1398 self
.log
.debug("Creating dir {}".format(cluster_dir
))
1399 os
.makedirs(cluster_dir
)
1400 if not os
.path
.exists(cluster_dir
):
1401 msg
= "Base cluster dir {} does not exist".format(cluster_dir
)
1403 raise K8sException(msg
)
1406 kube_dir
= cluster_dir
+ "/" + ".kube"
1407 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
1408 self
.log
.debug("Creating dir {}".format(kube_dir
))
1409 os
.makedirs(kube_dir
)
1410 if not os
.path
.exists(kube_dir
):
1411 msg
= "Kube config dir {} does not exist".format(kube_dir
)
1413 raise K8sException(msg
)
1416 helm_dir
= cluster_dir
+ "/" + ".helm"
1417 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
1418 self
.log
.debug("Creating dir {}".format(helm_dir
))
1419 os
.makedirs(helm_dir
)
1420 if not os
.path
.exists(helm_dir
):
1421 msg
= "Helm config dir {} does not exist".format(helm_dir
)
1423 raise K8sException(msg
)
1425 config_filename
= kube_dir
+ "/config"
1426 return kube_dir
, helm_dir
, config_filename
, cluster_dir
1429 def _remove_multiple_spaces(strobj
):
1430 strobj
= strobj
.strip()
1431 while " " in strobj
:
1432 strobj
= strobj
.replace(" ", " ")
1435 def _local_exec(self
, command
: str) -> (str, int):
1436 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1437 self
.log
.debug("Executing sync local command: {}".format(command
))
1438 # raise exception if fails
1441 output
= subprocess
.check_output(
1442 command
, shell
=True, universal_newlines
=True
1445 self
.log
.debug(output
)
1449 return output
, return_code
1451 async def _local_async_exec(
1454 raise_exception_on_error
: bool = False,
1455 show_error_log
: bool = True,
1456 encode_utf8
: bool = False,
1459 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1460 self
.log
.debug("Executing async local command: {}".format(command
))
1463 command
= command
.split(sep
=" ")
1466 process
= await asyncio
.create_subprocess_exec(
1467 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
1470 # wait for command terminate
1471 stdout
, stderr
= await process
.communicate()
1473 return_code
= process
.returncode
1477 output
= stdout
.decode("utf-8").strip()
1478 # output = stdout.decode()
1480 output
= stderr
.decode("utf-8").strip()
1481 # output = stderr.decode()
1483 if return_code
!= 0 and show_error_log
:
1485 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1488 self
.log
.debug("Return code: {}".format(return_code
))
1490 if raise_exception_on_error
and return_code
!= 0:
1491 raise K8sException(output
)
1494 output
= output
.encode("utf-8").strip()
1495 output
= str(output
).replace("\\n", "\n")
1497 return output
, return_code
1499 except asyncio
.CancelledError
:
1501 except K8sException
:
1503 except Exception as e
:
1504 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1506 if raise_exception_on_error
:
1507 raise K8sException(e
) from e
1511 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1512 # self.log.debug('Checking if file {} exists...'.format(filename))
1513 if os
.path
.exists(filename
):
1516 msg
= "File {} does not exist".format(filename
)
1517 if exception_if_not_exists
:
1518 # self.log.error(msg)
1519 raise K8sException(msg
)