2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
29 from uuid
import uuid4
31 from n2vc
.exceptions
import K8sException
32 from n2vc
.k8s_conn
import K8sConnector
36 class K8sHelmConnector(K8sConnector
):
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
43 service_account
= "osm"
49 kubectl_command
: str = "/usr/bin/kubectl",
50 helm_command
: str = "/usr/bin/helm",
56 :param fs: file system for kubernetes and helm configuration
57 :param db: database object to write current operation status
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
61 :param on_update_db: callback called when k8s connector updates database
65 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
67 self
.log
.info("Initializing K8S Helm connector")
69 # random numbers for release name generation
70 random
.seed(time
.time())
75 # exception if kubectl is not installed
76 self
.kubectl_command
= kubectl_command
77 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
79 # exception if helm is not installed
80 self
._helm
_command
= helm_command
81 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
83 # initialize helm client-only
84 self
.log
.debug("Initializing helm client-only...")
85 command
= "{} init --client-only".format(self
._helm
_command
)
87 asyncio
.ensure_future(
88 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
90 # loop = asyncio.get_event_loop()
91 # loop.run_until_complete(self._local_async_exec(command=command,
92 # raise_exception_on_error=False))
93 except Exception as e
:
95 msg
="helm init failed (it was already initialized): {}".format(e
)
98 self
.log
.info("K8S Helm connector initialized")
101 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
103 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
104 cluster_id for backward compatibility
106 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
107 return namespace
, cluster_id
110 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
113 It prepares a given K8s cluster environment to run Charts on both sides:
117 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
119 :param namespace: optional namespace to be used for helm. By default,
120 'kube-system' will be used
121 :param reuse_cluster_uuid: existing cluster uuid for reuse
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
129 namespace
= namespace_
or namespace
131 cluster_id
= str(uuid4())
132 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
134 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
136 # create config filename
137 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
138 cluster_name
=cluster_id
, create_if_not_exist
=True
140 with
open(config_filename
, "w") as f
:
143 # check if tiller pod is up in cluster
144 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
145 self
.kubectl_command
, config_filename
, namespace
147 output
, _rc
= await self
._local
_async
_exec
(
148 command
=command
, raise_exception_on_error
=True
151 output_table
= self
._output
_to
_table
(output
=output
)
153 # find 'tiller' pod in all pods
154 already_initialized
= False
156 for row
in output_table
:
157 if row
[0].startswith("tiller-deploy"):
158 already_initialized
= True
164 n2vc_installed_sw
= False
165 if not already_initialized
:
167 "Initializing helm in client and server: {}".format(cluster_id
)
169 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
170 self
.kubectl_command
, config_filename
, self
.service_account
)
171 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
173 command
= ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
174 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
175 ).format(self
.kubectl_command
, config_filename
, self
.service_account
)
176 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
178 command
= ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
179 "init").format(self
._helm
_command
, config_filename
, namespace
, helm_dir
,
180 self
.service_account
)
181 _
, _rc
= await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
182 n2vc_installed_sw
= True
184 # check client helm installation
185 check_file
= helm_dir
+ "/repository/repositories.yaml"
186 if not self
._check
_file
_exists
(filename
=check_file
, exception_if_not_exists
=False):
187 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
189 "{} --kubeconfig={} --tiller-namespace={} "
190 "--home={} init --client-only"
191 ).format(self
._helm
_command
, config_filename
, namespace
, helm_dir
)
192 output
, _rc
= await self
._local
_async
_exec
(
193 command
=command
, raise_exception_on_error
=True
196 self
.log
.info("Helm client already initialized")
198 self
.log
.info("Cluster {} initialized".format(cluster_id
))
200 return cluster_uuid
, n2vc_installed_sw
203 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
205 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
206 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
207 cluster_id
, repo_type
, name
, url
))
210 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
211 cluster_name
=cluster_id
, create_if_not_exist
=True
215 command
= "{} --kubeconfig={} --home={} repo update".format(
216 self
._helm
_command
, config_filename
, helm_dir
218 self
.log
.debug("updating repo: {}".format(command
))
219 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
221 # helm repo add name url
222 command
= "{} --kubeconfig={} --home={} repo add {} {}".format(
223 self
._helm
_command
, config_filename
, helm_dir
, name
, url
225 self
.log
.debug("adding repo: {}".format(command
))
226 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
228 async def repo_list(self
, cluster_uuid
: str) -> list:
230 Get the list of registered repositories
232 :return: list of registered repositories: [ (name, url) .... ]
235 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
236 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
239 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
240 cluster_name
=cluster_id
, create_if_not_exist
=True
243 command
= "{} --kubeconfig={} --home={} repo list --output yaml".format(
244 self
._helm
_command
, config_filename
, helm_dir
247 output
, _rc
= await self
._local
_async
_exec
(
248 command
=command
, raise_exception_on_error
=True
250 if output
and len(output
) > 0:
251 return yaml
.load(output
, Loader
=yaml
.SafeLoader
)
255 async def repo_remove(self
, cluster_uuid
: str, name
: str):
257 Remove a repository from OSM
259 :param cluster_uuid: the cluster or 'namespace:cluster'
260 :param name: repo name in OSM
261 :return: True if successful
264 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
265 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
268 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
269 cluster_name
=cluster_id
, create_if_not_exist
=True
272 command
= "{} --kubeconfig={} --home={} repo remove {}".format(
273 self
._helm
_command
, config_filename
, helm_dir
, name
276 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True)
279 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
282 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
284 "Resetting K8s environment. cluster uuid: {}".format(cluster_id
)
287 # get kube and helm directories
288 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
289 cluster_name
=cluster_id
, create_if_not_exist
=False
292 # uninstall releases if needed
293 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
294 if len(releases
) > 0:
298 kdu_instance
= r
.get("Name")
299 chart
= r
.get("Chart")
301 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
303 await self
.uninstall(
304 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
306 except Exception as e
:
308 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
312 "Cluster has releases and not force. Cannot reset K8s "
313 "environment. Cluster uuid: {}"
316 raise K8sException(msg
)
320 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
323 # find namespace for tiller pod
324 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
325 self
.kubectl_command
, config_filename
327 output
, _rc
= await self
._local
_async
_exec
(
328 command
=command
, raise_exception_on_error
=False
330 output_table
= K8sHelmConnector
._output
_to
_table
(output
=output
)
332 for r
in output_table
:
334 if "tiller-deploy" in r
[1]:
340 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
343 self
.log
.debug("namespace for tiller: {}".format(namespace
))
346 # uninstall tiller from cluster
348 "Uninstalling tiller from cluster {}".format(cluster_id
)
350 command
= "{} --kubeconfig={} --home={} reset".format(
351 self
._helm
_command
, config_filename
, helm_dir
353 self
.log
.debug("resetting: {}".format(command
))
354 output
, _rc
= await self
._local
_async
_exec
(
355 command
=command
, raise_exception_on_error
=True
357 # Delete clusterrolebinding and serviceaccount.
358 # Ignore if errors for backward compatibility
359 command
= ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
360 "io/osm-tiller-cluster-rule").format(self
.kubectl_command
,
362 output
, _rc
= await self
._local
_async
_exec
(command
=command
,
363 raise_exception_on_error
=False)
364 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
365 format(self
.kubectl_command
, config_filename
, self
.service_account
)
366 output
, _rc
= await self
._local
_async
_exec
(command
=command
,
367 raise_exception_on_error
=False)
370 self
.log
.debug("namespace not found")
372 # delete cluster directory
373 direct
= self
.fs
.path
+ "/" + cluster_id
374 self
.log
.debug("Removing directory {}".format(direct
))
375 shutil
.rmtree(direct
, ignore_errors
=True)
384 timeout
: float = 300,
386 db_dict
: dict = None,
387 kdu_name
: str = None,
388 namespace
: str = None,
391 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
392 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
395 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
396 cluster_name
=cluster_id
, create_if_not_exist
=True
400 # params_str = K8sHelmConnector._params_to_set_option(params)
401 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
402 cluster_id
=cluster_id
, params
=params
407 timeout_str
= "--timeout {}".format(timeout
)
412 atomic_str
= "--atomic"
416 namespace_str
= "--namespace {}".format(namespace
)
421 parts
= kdu_model
.split(sep
=":")
423 version_str
= "--version {}".format(parts
[1])
426 # generate a name for the release. Then, check if already exists
428 while kdu_instance
is None:
429 kdu_instance
= K8sHelmConnector
._generate
_release
_name
(kdu_model
)
431 result
= await self
._status
_kdu
(
432 cluster_id
=cluster_id
,
433 kdu_instance
=kdu_instance
,
434 show_error_log
=False,
436 if result
is not None:
437 # instance already exists: generate a new one
444 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
445 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
446 helm
=self
._helm
_command
,
448 config
=config_filename
,
458 self
.log
.debug("installing: {}".format(command
))
461 # exec helm in a task
462 exec_task
= asyncio
.ensure_future(
463 coro_or_future
=self
._local
_async
_exec
(
464 command
=command
, raise_exception_on_error
=False
468 # write status in another task
469 status_task
= asyncio
.ensure_future(
470 coro_or_future
=self
._store
_status
(
471 cluster_id
=cluster_id
,
472 kdu_instance
=kdu_instance
,
479 # wait for execution task
480 await asyncio
.wait([exec_task
])
485 output
, rc
= exec_task
.result()
489 output
, rc
= await self
._local
_async
_exec
(
490 command
=command
, raise_exception_on_error
=False
493 # remove temporal values yaml file
495 os
.remove(file_to_delete
)
498 await self
._store
_status
(
499 cluster_id
=cluster_id
,
500 kdu_instance
=kdu_instance
,
508 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
510 raise K8sException(msg
)
512 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
515 async def instances_list(self
, cluster_uuid
: str) -> list:
517 returns a list of deployed releases in a cluster
519 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
523 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
524 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
527 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
528 cluster_name
=cluster_id
, create_if_not_exist
=True
531 command
= "{} --kubeconfig={} --home={} list --output yaml".format(
532 self
._helm
_command
, config_filename
, helm_dir
535 output
, _rc
= await self
._local
_async
_exec
(
536 command
=command
, raise_exception_on_error
=True
539 if output
and len(output
) > 0:
540 return yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
548 kdu_model
: str = None,
550 timeout
: float = 300,
552 db_dict
: dict = None,
555 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
556 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
559 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
560 cluster_name
=cluster_id
, create_if_not_exist
=True
564 # params_str = K8sHelmConnector._params_to_set_option(params)
565 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
566 cluster_id
=cluster_id
, params
=params
571 timeout_str
= "--timeout {}".format(timeout
)
576 atomic_str
= "--atomic"
580 if kdu_model
and ":" in kdu_model
:
581 parts
= kdu_model
.split(sep
=":")
583 version_str
= "--version {}".format(parts
[1])
588 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
600 self
.log
.debug("upgrading: {}".format(command
))
604 # exec helm in a task
605 exec_task
= asyncio
.ensure_future(
606 coro_or_future
=self
._local
_async
_exec
(
607 command
=command
, raise_exception_on_error
=False
610 # write status in another task
611 status_task
= asyncio
.ensure_future(
612 coro_or_future
=self
._store
_status
(
613 cluster_id
=cluster_id
,
614 kdu_instance
=kdu_instance
,
621 # wait for execution task
622 await asyncio
.wait([exec_task
])
626 output
, rc
= exec_task
.result()
630 output
, rc
= await self
._local
_async
_exec
(
631 command
=command
, raise_exception_on_error
=False
634 # remove temporal values yaml file
636 os
.remove(file_to_delete
)
639 await self
._store
_status
(
640 cluster_id
=cluster_id
,
641 kdu_instance
=kdu_instance
,
649 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
651 raise K8sException(msg
)
653 # return new revision number
654 instance
= await self
.get_instance_info(
655 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
658 revision
= int(instance
.get("Revision"))
659 self
.log
.debug("New revision: {}".format(revision
))
665 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
668 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
670 "rollback kdu_instance {} to revision {} from cluster {}".format(
671 kdu_instance
, revision
, cluster_id
676 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
677 cluster_name
=cluster_id
, create_if_not_exist
=True
680 command
= "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
681 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
, revision
684 # exec helm in a task
685 exec_task
= asyncio
.ensure_future(
686 coro_or_future
=self
._local
_async
_exec
(
687 command
=command
, raise_exception_on_error
=False
690 # write status in another task
691 status_task
= asyncio
.ensure_future(
692 coro_or_future
=self
._store
_status
(
693 cluster_id
=cluster_id
,
694 kdu_instance
=kdu_instance
,
696 operation
="rollback",
701 # wait for execution task
702 await asyncio
.wait([exec_task
])
707 output
, rc
= exec_task
.result()
710 await self
._store
_status
(
711 cluster_id
=cluster_id
,
712 kdu_instance
=kdu_instance
,
714 operation
="rollback",
720 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
722 raise K8sException(msg
)
724 # return new revision number
725 instance
= await self
.get_instance_info(
726 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
729 revision
= int(instance
.get("Revision"))
730 self
.log
.debug("New revision: {}".format(revision
))
735 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
737 Removes an existing KDU instance. It would implicitly use the `delete` call
738 (this call would happen after all _terminate-config-primitive_ of the VNF
741 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
742 :param kdu_instance: unique name for the KDU instance to be deleted
743 :return: True if successful
746 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
748 "uninstall kdu_instance {} from cluster {}".format(
749 kdu_instance
, cluster_id
754 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
755 cluster_name
=cluster_id
, create_if_not_exist
=True
758 command
= "{} --kubeconfig={} --home={} delete --purge {}".format(
759 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
762 output
, _rc
= await self
._local
_async
_exec
(
763 command
=command
, raise_exception_on_error
=True
766 return self
._output
_to
_table
(output
)
768 async def exec_primitive(
770 cluster_uuid
: str = None,
771 kdu_instance
: str = None,
772 primitive_name
: str = None,
773 timeout
: float = 300,
775 db_dict
: dict = None,
777 """Exec primitive (Juju action)
779 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
780 :param kdu_instance str: The unique name of the KDU instance
781 :param primitive_name: Name of action that will be executed
782 :param timeout: Timeout for action execution
783 :param params: Dictionary of all the parameters needed for the action
784 :db_dict: Dictionary for any additional data
786 :return: Returns the output of the action
789 "KDUs deployed with Helm don't support actions "
790 "different from rollback, upgrade and status"
793 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
796 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
799 return await self
._exec
_inspect
_comand
(
800 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
803 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
806 "inspect kdu_model values {} from (optional) repo: {}".format(
811 return await self
._exec
_inspect
_comand
(
812 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
815 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
818 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
821 return await self
._exec
_inspect
_comand
(
822 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
825 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
827 # call internal function
828 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
829 return await self
._status
_kdu
(
830 cluster_id
=cluster_id
,
831 kdu_instance
=kdu_instance
,
836 async def get_services(self
,
839 namespace
: str) -> list:
842 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
843 cluster_uuid
, kdu_instance
847 status
= await self
._status
_kdu
(
848 cluster_uuid
, kdu_instance
, return_text
=False
851 service_names
= self
._parse
_helm
_status
_service
_info
(status
)
853 for service
in service_names
:
854 service
= await self
.get_service(cluster_uuid
, service
, namespace
)
855 service_list
.append(service
)
859 async def get_service(self
,
862 namespace
: str) -> object:
865 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
866 service_name
, namespace
, cluster_uuid
)
870 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
871 cluster_name
=cluster_uuid
, create_if_not_exist
=True
874 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
875 self
.kubectl_command
, config_filename
, namespace
, service_name
878 output
, _rc
= await self
._local
_async
_exec
(
879 command
=command
, raise_exception_on_error
=True
882 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
885 "name": service_name
,
886 "type": self
._get
_deep
(data
, ("spec", "type")),
887 "ports": self
._get
_deep
(data
, ("spec", "ports")),
888 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
890 if service
["type"] == "LoadBalancer":
891 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
892 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
893 service
["external_ip"] = ip_list
897 async def synchronize_repos(self
, cluster_uuid
: str):
899 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
900 self
.log
.debug("syncronize repos for cluster helm-id: {}",)
902 update_repos_timeout
= (
903 300 # max timeout to sync a single repos, more than this is too much
905 db_k8scluster
= self
.db
.get_one(
906 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid
}
910 db_k8scluster
.get("_admin").get("helm_chart_repos") or []
912 cluster_repo_dict
= (
913 db_k8scluster
.get("_admin").get("helm_charts_added") or {}
915 # elements that must be deleted
916 deleted_repo_list
= []
918 self
.log
.debug("helm_chart_repos: {}".format(nbi_repo_list
))
919 self
.log
.debug("helm_charts_added: {}".format(cluster_repo_dict
))
921 # obtain repos to add: registered by nbi but not added
923 repo
for repo
in nbi_repo_list
if not cluster_repo_dict
.get(repo
)
926 # obtain repos to delete: added by cluster but not in nbi list
929 for repo
in cluster_repo_dict
.keys()
930 if repo
not in nbi_repo_list
933 # delete repos: must delete first then add because there may be
934 # different repos with same name but
935 # different id and url
936 self
.log
.debug("repos to delete: {}".format(repos_to_delete
))
937 for repo_id
in repos_to_delete
:
938 # try to delete repos
940 repo_delete_task
= asyncio
.ensure_future(
942 cluster_uuid
=cluster_uuid
,
943 name
=cluster_repo_dict
[repo_id
],
946 await asyncio
.wait_for(repo_delete_task
, update_repos_timeout
)
947 except Exception as e
:
949 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
950 repo_id
, cluster_repo_dict
[repo_id
], str(e
)
953 # always add to the list of to_delete if there is an error
954 # because if is not there
955 # deleting raises error
956 deleted_repo_list
.append(repo_id
)
959 self
.log
.debug("repos to add: {}".format(repos_to_add
))
960 for repo_id
in repos_to_add
:
961 # obtain the repo data from the db
962 # if there is an error getting the repo in the database we will
963 # ignore this repo and continue
964 # because there is a possible race condition where the repo has
965 # been deleted while processing
966 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
968 "obtained repo: id, {}, name: {}, url: {}".format(
969 repo_id
, db_repo
["name"], db_repo
["url"]
973 repo_add_task
= asyncio
.ensure_future(
975 cluster_uuid
=cluster_uuid
,
976 name
=db_repo
["name"],
981 await asyncio
.wait_for(repo_add_task
, update_repos_timeout
)
982 added_repo_dict
[repo_id
] = db_repo
["name"]
984 "added repo: id, {}, name: {}".format(
985 repo_id
, db_repo
["name"]
988 except Exception as e
:
989 # deal with error adding repo, adding a repo that already
990 # exists does not raise any error
991 # will not raise error because a wrong repos added by
992 # anyone could prevent instantiating any ns
994 "Error adding repo id: {}, err_msg: {} ".format(
999 return deleted_repo_list
, added_repo_dict
1001 else: # else db_k8scluster does not exist
1003 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
1006 except Exception as e
:
1007 self
.log
.error("Error synchronizing repos: {}".format(str(e
)))
1008 raise K8sException("Error synchronizing repos")
1011 ####################################################################################
1012 ################################### P R I V A T E ##################################
1013 ####################################################################################
1016 async def _exec_inspect_comand(
1017 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1022 repo_str
= " --repo {}".format(repo_url
)
1023 idx
= kdu_model
.find("/")
1026 kdu_model
= kdu_model
[idx
:]
1028 inspect_command
= "{} inspect {} {}{}".format(
1029 self
._helm
_command
, inspect_command
, kdu_model
, repo_str
1031 output
, _rc
= await self
._local
_async
_exec
(
1032 command
=inspect_command
, encode_utf8
=True
1037 async def _status_kdu(
1041 show_error_log
: bool = False,
1042 return_text
: bool = False,
1045 self
.log
.debug("status of kdu_instance {}".format(kdu_instance
))
1048 _kube_dir
, helm_dir
, config_filename
, _cluster_dir
= self
._get
_paths
(
1049 cluster_name
=cluster_id
, create_if_not_exist
=True
1052 command
= "{} --kubeconfig={} --home={} status {} --output yaml".format(
1053 self
._helm
_command
, config_filename
, helm_dir
, kdu_instance
1056 output
, rc
= await self
._local
_async
_exec
(
1058 raise_exception_on_error
=True,
1059 show_error_log
=show_error_log
,
1068 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1070 # remove field 'notes'
1072 del data
.get("info").get("status")["notes"]
1076 # parse field 'resources'
1078 resources
= str(data
.get("info").get("status").get("resources"))
1079 resource_table
= self
._output
_to
_table
(resources
)
1080 data
.get("info").get("status")["resources"] = resource_table
1086 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
1087 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
1088 for instance
in instances
:
1089 if instance
.get("Name") == kdu_instance
:
1091 self
.log
.debug("Instance {} not found".format(kdu_instance
))
1095 def _generate_release_name(chart_name
: str):
1096 # check embeded chart (file or dir)
1097 if chart_name
.startswith("/"):
1098 # extract file or directory name
1099 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1101 elif "://" in chart_name
:
1102 # extract last portion of URL
1103 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1106 for c
in chart_name
:
1107 if c
.isalpha() or c
.isnumeric():
1114 # if does not start with alpha character, prefix 'a'
1115 if not name
[0].isalpha():
1120 def get_random_number():
1121 r
= random
.randrange(start
=1, stop
=99999999)
1123 s
= s
.rjust(10, "0")
1126 name
= name
+ get_random_number()
1129 async def _store_status(
1134 check_every
: float = 10,
1135 db_dict
: dict = None,
1136 run_once
: bool = False,
1140 await asyncio
.sleep(check_every
)
1141 detailed_status
= await self
._status
_kdu
(
1142 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
,
1145 status
= detailed_status
.get("info").get("Description")
1146 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1147 # write status to db
1148 result
= await self
.write_app_status_to_db(
1151 detailed_status
=str(detailed_status
),
1152 operation
=operation
,
1155 self
.log
.info("Error writing in database. Task exiting...")
1157 except asyncio
.CancelledError
:
1158 self
.log
.debug("Task cancelled")
1160 except Exception as e
:
1161 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1167 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
1169 status
= await self
._status
_kdu
(
1170 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
1173 # extract info.status.resources-> str
1176 # NAME READY UP-TO-DATE AVAILABLE AGE
1177 # halting-horse-mongodb 0/1 1 0 0s
1178 # halting-petit-mongodb 1/1 1 0 0s
1180 resources
= K8sHelmConnector
._get
_deep
(status
, ("info", "status", "resources"))
1183 resources
= K8sHelmConnector
._output
_to
_table
(resources
)
1185 num_lines
= len(resources
)
1187 while index
< num_lines
:
1189 line1
= resources
[index
]
1191 # find '==>' in column 0
1192 if line1
[0] == "==>":
1193 line2
= resources
[index
]
1195 # find READY in column 1
1196 if line2
[1] == "READY":
1198 line3
= resources
[index
]
1200 while len(line3
) > 1 and index
< num_lines
:
1201 ready_value
= line3
[1]
1202 parts
= ready_value
.split(sep
="/")
1203 current
= int(parts
[0])
1204 total
= int(parts
[1])
1206 self
.log
.debug("NOT READY:\n {}".format(line3
))
1208 line3
= resources
[index
]
1216 def _parse_helm_status_service_info(self
, status
):
1218 # extract info.status.resources-> str
1221 # NAME READY UP-TO-DATE AVAILABLE AGE
1222 # halting-horse-mongodb 0/1 1 0 0s
1223 # halting-petit-mongodb 1/1 1 0 0s
1225 resources
= K8sHelmConnector
._get
_deep
(status
, ("info", "status", "resources"))
1228 first_line_skipped
= service_found
= False
1229 for line
in resources
:
1230 if not service_found
:
1231 if len(line
) >= 2 and line
[0] == "==>" and line
[1] == "v1/Service":
1232 service_found
= True
1235 if len(line
) >= 2 and line
[0] == "==>":
1236 service_found
= first_line_skipped
= False
1240 if not first_line_skipped
:
1241 first_line_skipped
= True
1243 service_list
.append(line
[0])
1248 def _get_deep(dictionary
: dict, members
: tuple):
1253 value
= target
.get(m
)
1262 # find key:value in several lines
1264 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1265 for line
in p_lines
:
1267 if line
.startswith(p_key
+ ":"):
1268 parts
= line
.split(":")
1269 the_value
= parts
[1].strip()
1276 # params for use in -f file
1277 # returns values file option and filename (in order to delete it at the end)
1278 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1280 if params
and len(params
) > 0:
1281 self
._get
_paths
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1283 def get_random_number():
1284 r
= random
.randrange(start
=1, stop
=99999999)
1292 value
= params
.get(key
)
1293 if "!!yaml" in str(value
):
1294 value
= yaml
.load(value
[7:])
1295 params2
[key
] = value
1297 values_file
= get_random_number() + ".yaml"
1298 with
open(values_file
, "w") as stream
:
1299 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1301 return "-f {}".format(values_file
), values_file
1305 # params for use in --set option
1307 def _params_to_set_option(params
: dict) -> str:
1309 if params
and len(params
) > 0:
1312 value
= params
.get(key
, None)
1313 if value
is not None:
1315 params_str
+= "--set "
1319 params_str
+= "{}={}".format(key
, value
)
1323 def _output_to_lines(output
: str) -> list:
1324 output_lines
= list()
1325 lines
= output
.splitlines(keepends
=False)
1329 output_lines
.append(line
)
1333 def _output_to_table(output
: str) -> list:
1334 output_table
= list()
1335 lines
= output
.splitlines(keepends
=False)
1337 line
= line
.replace("\t", " ")
1339 output_table
.append(line_list
)
1340 cells
= line
.split(sep
=" ")
1344 line_list
.append(cell
)
1348 self
, cluster_name
: str, create_if_not_exist
: bool = False
1349 ) -> (str, str, str, str):
1351 Returns kube and helm directories
1353 :param cluster_name:
1354 :param create_if_not_exist:
1355 :return: kube, helm directories, config filename and cluster dir.
1356 Raises exception if not exist and cannot create
1360 if base
.endswith("/") or base
.endswith("\\"):
1363 # base dir for cluster
1364 cluster_dir
= base
+ "/" + cluster_name
1365 if create_if_not_exist
and not os
.path
.exists(cluster_dir
):
1366 self
.log
.debug("Creating dir {}".format(cluster_dir
))
1367 os
.makedirs(cluster_dir
)
1368 if not os
.path
.exists(cluster_dir
):
1369 msg
= "Base cluster dir {} does not exist".format(cluster_dir
)
1371 raise K8sException(msg
)
1374 kube_dir
= cluster_dir
+ "/" + ".kube"
1375 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
1376 self
.log
.debug("Creating dir {}".format(kube_dir
))
1377 os
.makedirs(kube_dir
)
1378 if not os
.path
.exists(kube_dir
):
1379 msg
= "Kube config dir {} does not exist".format(kube_dir
)
1381 raise K8sException(msg
)
1384 helm_dir
= cluster_dir
+ "/" + ".helm"
1385 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
1386 self
.log
.debug("Creating dir {}".format(helm_dir
))
1387 os
.makedirs(helm_dir
)
1388 if not os
.path
.exists(helm_dir
):
1389 msg
= "Helm config dir {} does not exist".format(helm_dir
)
1391 raise K8sException(msg
)
1393 config_filename
= kube_dir
+ "/config"
1394 return kube_dir
, helm_dir
, config_filename
, cluster_dir
1397 def _remove_multiple_spaces(strobj
):
1398 strobj
= strobj
.strip()
1399 while " " in strobj
:
1400 strobj
= strobj
.replace(" ", " ")
1403 def _local_exec(self
, command
: str) -> (str, int):
1404 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1405 self
.log
.debug("Executing sync local command: {}".format(command
))
1406 # raise exception if fails
1409 output
= subprocess
.check_output(
1410 command
, shell
=True, universal_newlines
=True
1413 self
.log
.debug(output
)
1417 return output
, return_code
1419 async def _local_async_exec(
1422 raise_exception_on_error
: bool = False,
1423 show_error_log
: bool = True,
1424 encode_utf8
: bool = False,
1427 command
= K8sHelmConnector
._remove
_multiple
_spaces
(command
)
1428 self
.log
.debug("Executing async local command: {}".format(command
))
1431 command
= command
.split(sep
=" ")
1434 process
= await asyncio
.create_subprocess_exec(
1435 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
1438 # wait for command terminate
1439 stdout
, stderr
= await process
.communicate()
1441 return_code
= process
.returncode
1445 output
= stdout
.decode("utf-8").strip()
1446 # output = stdout.decode()
1448 output
= stderr
.decode("utf-8").strip()
1449 # output = stderr.decode()
1451 if return_code
!= 0 and show_error_log
:
1453 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1456 self
.log
.debug("Return code: {}".format(return_code
))
1458 if raise_exception_on_error
and return_code
!= 0:
1459 raise K8sException(output
)
1462 output
= output
.encode("utf-8").strip()
1463 output
= str(output
).replace("\\n", "\n")
1465 return output
, return_code
1467 except asyncio
.CancelledError
:
1469 except K8sException
:
1471 except Exception as e
:
1472 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1474 if raise_exception_on_error
:
1475 raise K8sException(e
) from e
1479 def _check_file_exists(self
, filename
: str, exception_if_not_exists
: bool = False):
1480 # self.log.debug('Checking if file {} exists...'.format(filename))
1481 if os
.path
.exists(filename
):
1484 msg
= "File {} does not exist".format(filename
)
1485 if exception_if_not_exists
:
1486 # self.log.error(msg)
1487 raise K8sException(msg
)