5f004b37f17f627f66a3007a455e655b3c5a39a2
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
25 from shlex
import quote
33 from uuid
import uuid4
34 from urllib
.parse
import urlparse
36 from n2vc
.config
import EnvironConfig
37 from n2vc
.exceptions
import K8sException
38 from n2vc
.k8s_conn
import K8sConnector
39 from n2vc
.kubectl
import Kubectl
42 class K8sHelmBaseConnector(K8sConnector
):
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
50 service_account
= "osm"
56 kubectl_command
: str = "/usr/bin/kubectl",
57 helm_command
: str = "/usr/bin/helm",
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
68 :param on_update_db: callback called when k8s connector updates database
72 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
74 self
.log
.info("Initializing K8S Helm connector")
76 self
.config
= EnvironConfig()
77 # random numbers for release name generation
78 random
.seed(time
.time())
83 # exception if kubectl is not installed
84 self
.kubectl_command
= kubectl_command
85 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
87 # exception if helm is not installed
88 self
._helm
_command
= helm_command
89 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
91 # obtain stable repo url from config or apply default
92 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
93 if self
._stable
_repo
_url
== "None":
94 self
._stable
_repo
_url
= None
96 # Lock to avoid concurrent execution of helm commands
97 self
.cmd_lock
= asyncio
.Lock()
99 def _get_namespace(self
, cluster_uuid
: str) -> str:
101 Obtains the namespace used by the cluster with the uuid passed by argument
103 param: cluster_uuid: cluster's uuid
106 # first, obtain the cluster corresponding to the uuid passed by argument
107 k8scluster
= self
.db
.get_one(
108 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
110 return k8scluster
.get("namespace")
115 namespace
: str = "kube-system",
116 reuse_cluster_uuid
=None,
118 ) -> tuple[str, bool]:
120 It prepares a given K8s cluster environment to run Charts
122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
124 :param namespace: optional namespace to be used for helm. By default,
125 'kube-system' will be used
126 :param reuse_cluster_uuid: existing cluster uuid for reuse
127 :param kwargs: Additional parameters (None yet)
128 :return: uuid of the K8s cluster and True if connector has installed some
129 software in the cluster
130 (on error, an exception will be raised)
133 if reuse_cluster_uuid
:
134 cluster_id
= reuse_cluster_uuid
136 cluster_id
= str(uuid4())
139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_id
, create_if_not_exist
=True
145 mode
= stat
.S_IRUSR | stat
.S_IWUSR
146 with
open(paths
["kube_config"], "w", mode
) as f
:
148 os
.chmod(paths
["kube_config"], 0o600)
150 # Code with initialization specific of helm version
151 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
153 # sync fs with local data
154 self
.fs
.reverse_sync(from_path
=cluster_id
)
156 self
.log
.info("Cluster {} initialized".format(cluster_id
))
158 return cluster_id
, n2vc_installed_sw
165 repo_type
: str = "chart",
168 password
: str = None,
172 "Cluster {}, adding {} repository {}. URL: {}".format(
173 cluster_uuid
, repo_type
, name
, url
178 paths
, env
= self
._init
_paths
_env
(
179 cluster_name
=cluster_uuid
, create_if_not_exist
=True
183 self
.fs
.sync(from_path
=cluster_uuid
)
186 if user
and password
:
187 host_port
= urlparse(url
).netloc
if url
.startswith("oci://") else url
188 # helm registry login url
189 command
= "env KUBECONFIG={} {} registry login {}".format(
190 paths
["kube_config"], self
._helm
_command
, quote(host_port
)
194 "OCI registry login is not needed for repo: {}".format(name
)
198 # helm repo add name url
199 command
= "env KUBECONFIG={} {} repo add {} {}".format(
200 paths
["kube_config"], self
._helm
_command
, quote(name
), quote(url
)
204 temp_cert_file
= os
.path
.join(
205 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
207 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
208 with
open(temp_cert_file
, "w") as the_cert
:
210 command
+= " --ca-file {}".format(quote(temp_cert_file
))
213 command
+= " --username={}".format(quote(user
))
216 command
+= " --password={}".format(quote(password
))
218 self
.log
.debug("adding repo: {}".format(command
))
219 await self
._local
_async
_exec
(
220 command
=command
, raise_exception_on_error
=True, env
=env
225 command
= "env KUBECONFIG={} {} repo update {}".format(
226 paths
["kube_config"], self
._helm
_command
, quote(name
)
228 self
.log
.debug("updating repo: {}".format(command
))
229 await self
._local
_async
_exec
(
230 command
=command
, raise_exception_on_error
=False, env
=env
234 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
236 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
238 "Cluster {}, updating {} repository {}".format(
239 cluster_uuid
, repo_type
, name
244 paths
, env
= self
._init
_paths
_env
(
245 cluster_name
=cluster_uuid
, create_if_not_exist
=True
249 self
.fs
.sync(from_path
=cluster_uuid
)
252 command
= "{} repo update {}".format(self
._helm
_command
, quote(name
))
253 self
.log
.debug("updating repo: {}".format(command
))
254 await self
._local
_async
_exec
(
255 command
=command
, raise_exception_on_error
=False, env
=env
259 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
261 async def repo_list(self
, cluster_uuid
: str) -> list:
263 Get the list of registered repositories
265 :return: list of registered repositories: [ (name, url) .... ]
268 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
271 paths
, env
= self
._init
_paths
_env
(
272 cluster_name
=cluster_uuid
, create_if_not_exist
=True
276 self
.fs
.sync(from_path
=cluster_uuid
)
278 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
279 paths
["kube_config"], self
._helm
_command
282 # Set exception to false because if there are no repos just want an empty list
283 output
, _rc
= await self
._local
_async
_exec
(
284 command
=command
, raise_exception_on_error
=False, env
=env
288 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
291 if output
and len(output
) > 0:
292 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
293 # unify format between helm2 and helm3 setting all keys lowercase
294 return self
._lower
_keys
_list
(repos
)
300 async def repo_remove(self
, cluster_uuid
: str, name
: str):
302 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
306 paths
, env
= self
._init
_paths
_env
(
307 cluster_name
=cluster_uuid
, create_if_not_exist
=True
311 self
.fs
.sync(from_path
=cluster_uuid
)
313 command
= "env KUBECONFIG={} {} repo remove {}".format(
314 paths
["kube_config"], self
._helm
_command
, quote(name
)
316 await self
._local
_async
_exec
(
317 command
=command
, raise_exception_on_error
=True, env
=env
321 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
327 uninstall_sw
: bool = False,
332 Resets the Kubernetes cluster by removing the helm deployment that represents it.
334 :param cluster_uuid: The UUID of the cluster to reset
335 :param force: Boolean to force the reset
336 :param uninstall_sw: Boolean to force the reset
337 :param kwargs: Additional parameters (None yet)
338 :return: Returns True if successful or raises an exception.
340 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343 cluster_uuid
, uninstall_sw
348 self
.fs
.sync(from_path
=cluster_uuid
)
350 # uninstall releases if needed.
352 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
353 if len(releases
) > 0:
357 kdu_instance
= r
.get("name")
358 chart
= r
.get("chart")
360 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
362 await self
.uninstall(
363 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
365 except Exception as e
:
366 # will not raise exception as it was found
367 # that in some cases of previously installed helm releases it
370 "Error uninstalling release {}: {}".format(
376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377 ).format(cluster_uuid
)
380 False # Allow to remove k8s cluster without removing Tiller
384 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
386 # delete cluster directory
387 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
388 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
389 # Remove also local directorio if still exist
390 direct
= self
.fs
.path
+ "/" + cluster_uuid
391 shutil
.rmtree(direct
, ignore_errors
=True)
395 def _is_helm_chart_a_file(self
, chart_name
: str):
396 return chart_name
.count("/") > 1
399 def _is_helm_chart_a_url(chart_name
: str):
400 result
= urlparse(chart_name
)
401 return all([result
.scheme
, result
.netloc
])
403 async def _install_impl(
411 timeout
: float = 300,
413 db_dict
: dict = None,
414 kdu_name
: str = None,
415 namespace
: str = None,
418 paths
, env
= self
._init
_paths
_env
(
419 cluster_name
=cluster_id
, create_if_not_exist
=True
423 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
424 cluster_id
=cluster_id
, params
=params
427 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_id
)
429 command
= self
._get
_install
_command
(
437 paths
["kube_config"],
440 self
.log
.debug("installing: {}".format(command
))
443 # exec helm in a task
444 exec_task
= asyncio
.ensure_future(
445 coro_or_future
=self
._local
_async
_exec
(
446 command
=command
, raise_exception_on_error
=False, env
=env
450 # write status in another task
451 status_task
= asyncio
.ensure_future(
452 coro_or_future
=self
._store
_status
(
453 cluster_id
=cluster_id
,
454 kdu_instance
=kdu_instance
,
461 # wait for execution task
462 await asyncio
.wait([exec_task
])
467 output
, rc
= exec_task
.result()
470 output
, rc
= await self
._local
_async
_exec
(
471 command
=command
, raise_exception_on_error
=False, env
=env
474 # remove temporal values yaml file
476 os
.remove(file_to_delete
)
479 await self
._store
_status
(
480 cluster_id
=cluster_id
,
481 kdu_instance
=kdu_instance
,
488 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
490 raise K8sException(msg
)
496 kdu_model
: str = None,
498 timeout
: float = 300,
500 db_dict
: dict = None,
501 namespace
: str = None,
504 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
507 self
.fs
.sync(from_path
=cluster_uuid
)
509 # look for instance to obtain namespace
513 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
514 if not instance_info
:
515 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
516 namespace
= instance_info
["namespace"]
519 paths
, env
= self
._init
_paths
_env
(
520 cluster_name
=cluster_uuid
, create_if_not_exist
=True
524 self
.fs
.sync(from_path
=cluster_uuid
)
527 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
528 cluster_id
=cluster_uuid
, params
=params
531 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
533 command
= self
._get
_upgrade
_command
(
541 paths
["kube_config"],
545 self
.log
.debug("upgrading: {}".format(command
))
548 # exec helm in a task
549 exec_task
= asyncio
.ensure_future(
550 coro_or_future
=self
._local
_async
_exec
(
551 command
=command
, raise_exception_on_error
=False, env
=env
554 # write status in another task
555 status_task
= asyncio
.ensure_future(
556 coro_or_future
=self
._store
_status
(
557 cluster_id
=cluster_uuid
,
558 kdu_instance
=kdu_instance
,
565 # wait for execution task
566 await asyncio
.wait([exec_task
])
570 output
, rc
= exec_task
.result()
573 output
, rc
= await self
._local
_async
_exec
(
574 command
=command
, raise_exception_on_error
=False, env
=env
577 # remove temporal values yaml file
579 os
.remove(file_to_delete
)
582 await self
._store
_status
(
583 cluster_id
=cluster_uuid
,
584 kdu_instance
=kdu_instance
,
591 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
593 raise K8sException(msg
)
596 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
598 # return new revision number
599 instance
= await self
.get_instance_info(
600 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
603 revision
= int(instance
.get("revision"))
604 self
.log
.debug("New revision: {}".format(revision
))
614 total_timeout
: float = 1800,
615 cluster_uuid
: str = None,
616 kdu_model
: str = None,
618 db_dict
: dict = None,
621 """Scale a resource in a Helm Chart.
624 kdu_instance: KDU instance name
625 scale: Scale to which to set the resource
626 resource_name: Resource name
627 total_timeout: The time, in seconds, to wait
628 cluster_uuid: The UUID of the cluster
629 kdu_model: The chart reference
630 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
631 The --wait flag will be set automatically if --atomic is used
632 db_dict: Dictionary for any additional data
633 kwargs: Additional parameters
636 True if successful, False otherwise
639 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
641 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
642 resource_name
, kdu_model
, cluster_uuid
645 self
.log
.debug(debug_mgs
)
647 # look for instance to obtain namespace
648 # get_instance_info function calls the sync command
649 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
650 if not instance_info
:
651 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
654 paths
, env
= self
._init
_paths
_env
(
655 cluster_name
=cluster_uuid
, create_if_not_exist
=True
659 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
661 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
663 _
, replica_str
= await self
._get
_replica
_count
_url
(
664 kdu_model
, repo_url
, resource_name
667 command
= self
._get
_upgrade
_scale
_command
(
670 instance_info
["namespace"],
677 paths
["kube_config"],
680 self
.log
.debug("scaling: {}".format(command
))
683 # exec helm in a task
684 exec_task
= asyncio
.ensure_future(
685 coro_or_future
=self
._local
_async
_exec
(
686 command
=command
, raise_exception_on_error
=False, env
=env
689 # write status in another task
690 status_task
= asyncio
.ensure_future(
691 coro_or_future
=self
._store
_status
(
692 cluster_id
=cluster_uuid
,
693 kdu_instance
=kdu_instance
,
694 namespace
=instance_info
["namespace"],
700 # wait for execution task
701 await asyncio
.wait([exec_task
])
705 output
, rc
= exec_task
.result()
708 output
, rc
= await self
._local
_async
_exec
(
709 command
=command
, raise_exception_on_error
=False, env
=env
713 await self
._store
_status
(
714 cluster_id
=cluster_uuid
,
715 kdu_instance
=kdu_instance
,
716 namespace
=instance_info
["namespace"],
722 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
724 raise K8sException(msg
)
727 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
731 async def get_scale_count(
739 """Get a resource scale count.
742 cluster_uuid: The UUID of the cluster
743 resource_name: Resource name
744 kdu_instance: KDU instance name
745 kdu_model: The name or path of an Helm Chart
746 kwargs: Additional parameters
749 Resource instance count
753 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
756 # look for instance to obtain namespace
757 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
758 if not instance_info
:
759 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
762 paths
, _
= self
._init
_paths
_env
(
763 cluster_name
=cluster_uuid
, create_if_not_exist
=True
766 replicas
= await self
._get
_replica
_count
_instance
(
767 kdu_instance
=kdu_instance
,
768 namespace
=instance_info
["namespace"],
769 kubeconfig
=paths
["kube_config"],
770 resource_name
=resource_name
,
774 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
777 # Get default value if scale count is not found from provided values
778 # Important note: this piece of code shall only be executed in the first scaling operation,
779 # since it is expected that the _get_replica_count_instance is able to obtain the number of
780 # replicas when a scale operation was already conducted previously for this KDU/resource!
782 repo_url
= await self
._find
_repo
(
783 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
785 replicas
, _
= await self
._get
_replica
_count
_url
(
786 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
790 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
791 f
"{resource_name} obtained: {replicas}"
795 msg
= "Replica count not found. Cannot be scaled"
797 raise K8sException(msg
)
802 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
805 "rollback kdu_instance {} to revision {} from cluster {}".format(
806 kdu_instance
, revision
, cluster_uuid
811 self
.fs
.sync(from_path
=cluster_uuid
)
813 # look for instance to obtain namespace
814 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
815 if not instance_info
:
816 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
819 paths
, env
= self
._init
_paths
_env
(
820 cluster_name
=cluster_uuid
, create_if_not_exist
=True
824 self
.fs
.sync(from_path
=cluster_uuid
)
826 command
= self
._get
_rollback
_command
(
827 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
830 self
.log
.debug("rolling_back: {}".format(command
))
832 # exec helm in a task
833 exec_task
= asyncio
.ensure_future(
834 coro_or_future
=self
._local
_async
_exec
(
835 command
=command
, raise_exception_on_error
=False, env
=env
838 # write status in another task
839 status_task
= asyncio
.ensure_future(
840 coro_or_future
=self
._store
_status
(
841 cluster_id
=cluster_uuid
,
842 kdu_instance
=kdu_instance
,
843 namespace
=instance_info
["namespace"],
845 operation
="rollback",
849 # wait for execution task
850 await asyncio
.wait([exec_task
])
855 output
, rc
= exec_task
.result()
858 await self
._store
_status
(
859 cluster_id
=cluster_uuid
,
860 kdu_instance
=kdu_instance
,
861 namespace
=instance_info
["namespace"],
863 operation
="rollback",
867 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
869 raise K8sException(msg
)
872 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
874 # return new revision number
875 instance
= await self
.get_instance_info(
876 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
879 revision
= int(instance
.get("revision"))
880 self
.log
.debug("New revision: {}".format(revision
))
885 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
887 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
888 (this call should happen after all _terminate-config-primitive_ of the VNF
891 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
892 :param kdu_instance: unique name for the KDU instance to be deleted
893 :param kwargs: Additional parameters (None yet)
894 :return: True if successful
898 "uninstall kdu_instance {} from cluster {}".format(
899 kdu_instance
, cluster_uuid
904 self
.fs
.sync(from_path
=cluster_uuid
)
906 # look for instance to obtain namespace
907 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
908 if not instance_info
:
909 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
912 paths
, env
= self
._init
_paths
_env
(
913 cluster_name
=cluster_uuid
, create_if_not_exist
=True
917 self
.fs
.sync(from_path
=cluster_uuid
)
919 command
= self
._get
_uninstall
_command
(
920 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
922 output
, _rc
= await self
._local
_async
_exec
(
923 command
=command
, raise_exception_on_error
=True, env
=env
927 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
929 return self
._output
_to
_table
(output
)
931 async def instances_list(self
, cluster_uuid
: str) -> list:
933 returns a list of deployed releases in a cluster
935 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
939 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
942 self
.fs
.sync(from_path
=cluster_uuid
)
944 # execute internal command
945 result
= await self
._instances
_list
(cluster_uuid
)
948 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
952 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
953 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
954 for instance
in instances
:
955 if instance
.get("name") == kdu_instance
:
957 self
.log
.debug("Instance {} not found".format(kdu_instance
))
960 async def upgrade_charm(
964 charm_id
: str = None,
965 charm_type
: str = None,
966 timeout
: float = None,
968 """This method upgrade charms in VNFs
971 ee_id: Execution environment id
972 path: Local path to the charm
974 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
975 timeout: (Float) Timeout for the ns update operation
978 The output of the update operation if status equals to "completed"
980 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
982 async def exec_primitive(
984 cluster_uuid
: str = None,
985 kdu_instance
: str = None,
986 primitive_name
: str = None,
987 timeout
: float = 300,
989 db_dict
: dict = None,
992 """Exec primitive (Juju action)
994 :param cluster_uuid: The UUID of the cluster or namespace:cluster
995 :param kdu_instance: The unique name of the KDU instance
996 :param primitive_name: Name of action that will be executed
997 :param timeout: Timeout for action execution
998 :param params: Dictionary of all the parameters needed for the action
999 :db_dict: Dictionary for any additional data
1000 :param kwargs: Additional parameters (None yet)
1002 :return: Returns the output of the action
1005 "KDUs deployed with Helm don't support actions "
1006 "different from rollback, upgrade and status"
1009 async def get_services(
1010 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1013 Returns a list of services defined for the specified kdu instance.
1015 :param cluster_uuid: UUID of a K8s cluster known by OSM
1016 :param kdu_instance: unique name for the KDU instance
1017 :param namespace: K8s namespace used by the KDU instance
1018 :return: If successful, it will return a list of services, Each service
1019 can have the following data:
1020 - `name` of the service
1021 - `type` type of service in the k8 cluster
1022 - `ports` List of ports offered by the service, for each port includes at least
1023 name, port, protocol
1024 - `cluster_ip` Internal ip to be used inside k8s cluster
1025 - `external_ip` List of external ips (in case they are available)
1029 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1030 cluster_uuid
, kdu_instance
1035 paths
, env
= self
._init
_paths
_env
(
1036 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1040 self
.fs
.sync(from_path
=cluster_uuid
)
1042 # get list of services names for kdu
1043 service_names
= await self
._get
_services
(
1044 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1048 for service
in service_names
:
1049 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1050 service_list
.append(service
)
1053 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1057 async def get_service(
1058 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1061 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1062 service_name
, namespace
, cluster_uuid
1067 self
.fs
.sync(from_path
=cluster_uuid
)
1069 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1072 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1076 async def status_kdu(
1077 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1078 ) -> Union
[str, dict]:
1080 This call would retrieve tha current state of a given KDU instance. It would be
1081 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1082 values_ of the configuration parameters applied to a given instance. This call
1083 would be based on the `status` call.
1085 :param cluster_uuid: UUID of a K8s cluster known by OSM
1086 :param kdu_instance: unique name for the KDU instance
1087 :param kwargs: Additional parameters (None yet)
1088 :param yaml_format: if the return shall be returned as an YAML string or as a
1090 :return: If successful, it will return the following vector of arguments:
1091 - K8s `namespace` in the cluster where the KDU lives
1092 - `state` of the KDU instance. It can be:
1099 - List of `resources` (objects) that this release consists of, sorted by kind,
1100 and the status of those resources
1101 - Last `deployment_time`.
1105 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1106 cluster_uuid
, kdu_instance
1111 self
.fs
.sync(from_path
=cluster_uuid
)
1113 # get instance: needed to obtain namespace
1114 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1115 for instance
in instances
:
1116 if instance
.get("name") == kdu_instance
:
1119 # instance does not exist
1121 "Instance name: {} not found in cluster: {}".format(
1122 kdu_instance
, cluster_uuid
1126 status
= await self
._status
_kdu
(
1127 cluster_id
=cluster_uuid
,
1128 kdu_instance
=kdu_instance
,
1129 namespace
=instance
["namespace"],
1130 yaml_format
=yaml_format
,
1131 show_error_log
=True,
1135 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1139 async def get_values_kdu(
1140 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1142 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1144 return await self
._exec
_get
_command
(
1145 get_command
="values",
1146 kdu_instance
=kdu_instance
,
1147 namespace
=namespace
,
1148 kubeconfig
=kubeconfig
,
1151 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1152 """Method to obtain the Helm Chart package's values
1155 kdu_model: The name or path of an Helm Chart
1156 repo_url: Helm Chart repository url
1159 str: the values of the Helm Chart package
1163 "inspect kdu_model values {} from (optional) repo: {}".format(
1168 return await self
._exec
_inspect
_command
(
1169 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1172 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1174 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1177 return await self
._exec
_inspect
_command
(
1178 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1181 async def synchronize_repos(self
, cluster_uuid
: str):
1182 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1184 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1185 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1187 local_repo_list
= await self
.repo_list(cluster_uuid
)
1188 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1190 deleted_repo_list
= []
1191 added_repo_dict
= {}
1193 # iterate over the list of repos in the database that should be
1194 # added if not present
1195 for repo_name
, db_repo
in db_repo_dict
.items():
1197 # check if it is already present
1198 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1199 repo_id
= db_repo
.get("_id")
1200 if curr_repo_url
!= db_repo
["url"]:
1203 "repo {} url changed, delete and and again".format(
1207 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1208 deleted_repo_list
.append(repo_id
)
1211 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1212 await self
.repo_add(
1216 cert
=db_repo
.get("ca_cert"),
1217 user
=db_repo
.get("user"),
1218 password
=db_repo
.get("password"),
1219 oci
=db_repo
.get("oci", False),
1221 added_repo_dict
[repo_id
] = db_repo
["name"]
1222 except Exception as e
:
1224 "Error adding repo id: {}, err_msg: {} ".format(
1229 # Delete repos that are present but not in nbi_list
1230 for repo_name
in local_repo_dict
:
1231 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1232 self
.log
.debug("delete repo {}".format(repo_name
))
1234 await self
.repo_remove(cluster_uuid
, repo_name
)
1235 deleted_repo_list
.append(repo_name
)
1236 except Exception as e
:
1238 "Error deleting repo, name: {}, err_msg: {}".format(
1243 return deleted_repo_list
, added_repo_dict
1245 except K8sException
:
1247 except Exception as e
:
1248 # Do not raise errors synchronizing repos
1249 self
.log
.error("Error synchronizing repos: {}".format(e
))
1250 raise Exception("Error synchronizing repos: {}".format(e
))
1252 def _get_db_repos_dict(self
, repo_ids
: list):
1254 for repo_id
in repo_ids
:
1255 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1256 db_repos_dict
[db_repo
["name"]] = db_repo
1257 return db_repos_dict
1260 ####################################################################################
1261 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1262 ####################################################################################
1266 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1268 Creates and returns base cluster and kube dirs and returns them.
1269 Also created helm3 dirs according to new directory specification, paths are
1270 not returned but assigned to helm environment variables
1272 :param cluster_name: cluster_name
1273 :return: Dictionary with config_paths and dictionary with helm environment variables
1277 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1279 Implements the helm version dependent cluster initialization
1283 async def _instances_list(self
, cluster_id
):
1285 Implements the helm version dependent helm instances list
1289 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1291 Implements the helm version dependent method to obtain services from a helm instance
1295 async def _status_kdu(
1299 namespace
: str = None,
1300 yaml_format
: bool = False,
1301 show_error_log
: bool = False,
1302 ) -> Union
[str, dict]:
1304 Implements the helm version dependent method to obtain status of a helm instance
1308 def _get_install_command(
1320 Obtain command to be executed to delete the indicated instance
1324 def _get_upgrade_scale_command(
1337 """Generates the command to scale a Helm Chart release
1340 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1341 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1342 namespace (str): Namespace where this KDU instance is deployed
1343 scale (int): Scale count
1344 version (str): Constraint with specific version of the Chart to use
1345 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1346 The --wait flag will be set automatically if --atomic is used
1347 replica_str (str): The key under resource_name key where the scale count is stored
1348 timeout (float): The time, in seconds, to wait
1349 resource_name (str): The KDU's resource to scale
1350 kubeconfig (str): Kubeconfig file path
1353 str: command to scale a Helm Chart release
1357 def _get_upgrade_command(
1369 """Generates the command to upgrade a Helm Chart release
1372 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1373 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1374 namespace (str): Namespace where this KDU instance is deployed
1375 params_str (str): Params used to upgrade the Helm Chart release
1376 version (str): Constraint with specific version of the Chart to use
1377 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1378 The --wait flag will be set automatically if --atomic is used
1379 timeout (float): The time, in seconds, to wait
1380 kubeconfig (str): Kubeconfig file path
1381 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1383 str: command to upgrade a Helm Chart release
1387 def _get_rollback_command(
1388 self
, kdu_instance
, namespace
, revision
, kubeconfig
1391 Obtain command to be executed to rollback the indicated instance
1395 def _get_uninstall_command(
1396 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1399 Obtain command to be executed to delete the indicated instance
1403 def _get_inspect_command(
1404 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1406 """Generates the command to obtain the information about an Helm Chart package
1407 (´helm show ...´ command)
1410 show_command: the second part of the command (`helm show <show_command>`)
1411 kdu_model: The name or path of an Helm Chart
1412 repo_url: Helm Chart repository url
1413 version: constraint with specific version of the Chart to use
1416 str: the generated Helm Chart command
1420 def _get_get_command(
1421 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1423 """Obtain command to be executed to get information about the kdu instance."""
1426 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1428 Method call to uninstall cluster software for helm. This method is dependent
1430 For Helm v2 it will be called when Tiller must be uninstalled
1431 For Helm v3 it does nothing and does not need to be callled
1435 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1437 Obtains the cluster repos identifiers
1441 ####################################################################################
1442 ################################### P R I V A T E ##################################
1443 ####################################################################################
1447 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1448 if os
.path
.exists(filename
):
1451 msg
= "File {} does not exist".format(filename
)
1452 if exception_if_not_exists
:
1453 raise K8sException(msg
)
1456 def _remove_multiple_spaces(strobj
):
1457 strobj
= strobj
.strip()
1458 while " " in strobj
:
1459 strobj
= strobj
.replace(" ", " ")
1463 def _output_to_lines(output
: str) -> list:
1464 output_lines
= list()
1465 lines
= output
.splitlines(keepends
=False)
1469 output_lines
.append(line
)
1473 def _output_to_table(output
: str) -> list:
1474 output_table
= list()
1475 lines
= output
.splitlines(keepends
=False)
1477 line
= line
.replace("\t", " ")
1479 output_table
.append(line_list
)
1480 cells
= line
.split(sep
=" ")
1484 line_list
.append(cell
)
1488 def _parse_services(output
: str) -> list:
1489 lines
= output
.splitlines(keepends
=False)
1492 line
= line
.replace("\t", " ")
1493 cells
= line
.split(sep
=" ")
1494 if len(cells
) > 0 and cells
[0].startswith("service/"):
1495 elems
= cells
[0].split(sep
="/")
1497 services
.append(elems
[1])
1501 def _get_deep(dictionary
: dict, members
: tuple):
1506 value
= target
.get(m
)
1515 # find key:value in several lines
1517 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1518 for line
in p_lines
:
1520 if line
.startswith(p_key
+ ":"):
1521 parts
= line
.split(":")
1522 the_value
= parts
[1].strip()
1530 def _lower_keys_list(input_list
: list):
1532 Transform the keys in a list of dictionaries to lower case and returns a new list
1537 for dictionary
in input_list
:
1538 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1539 new_list
.append(new_dict
)
1542 async def _local_async_exec(
1545 raise_exception_on_error
: bool = False,
1546 show_error_log
: bool = True,
1547 encode_utf8
: bool = False,
1549 ) -> tuple[str, int]:
1550 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1552 "Executing async local command: {}, env: {}".format(command
, env
)
1556 command
= shlex
.split(command
)
1558 environ
= os
.environ
.copy()
1563 async with self
.cmd_lock
:
1564 process
= await asyncio
.create_subprocess_exec(
1566 stdout
=asyncio
.subprocess
.PIPE
,
1567 stderr
=asyncio
.subprocess
.PIPE
,
1571 # wait for command terminate
1572 stdout
, stderr
= await process
.communicate()
1574 return_code
= process
.returncode
1578 output
= stdout
.decode("utf-8").strip()
1579 # output = stdout.decode()
1581 output
= stderr
.decode("utf-8").strip()
1582 # output = stderr.decode()
1584 if return_code
!= 0 and show_error_log
:
1586 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1589 self
.log
.debug("Return code: {}".format(return_code
))
1591 if raise_exception_on_error
and return_code
!= 0:
1592 raise K8sException(output
)
1595 output
= output
.encode("utf-8").strip()
1596 output
= str(output
).replace("\\n", "\n")
1598 return output
, return_code
1600 except asyncio
.CancelledError
:
1601 # first, kill the process if it is still running
1602 if process
.returncode
is None:
1605 except K8sException
:
1607 except Exception as e
:
1608 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1610 if raise_exception_on_error
:
1611 raise K8sException(e
) from e
1615 async def _local_async_exec_pipe(
1619 raise_exception_on_error
: bool = True,
1620 show_error_log
: bool = True,
1621 encode_utf8
: bool = False,
1624 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1625 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1626 command
= "{} | {}".format(command1
, command2
)
1628 "Executing async local command: {}, env: {}".format(command
, env
)
1632 command1
= shlex
.split(command1
)
1633 command2
= shlex
.split(command2
)
1635 environ
= os
.environ
.copy()
1640 async with self
.cmd_lock
:
1641 read
, write
= os
.pipe()
1642 process_1
= await asyncio
.create_subprocess_exec(
1643 *command1
, stdout
=write
, env
=environ
1646 process_2
= await asyncio
.create_subprocess_exec(
1647 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1650 stdout
, stderr
= await process_2
.communicate()
1652 return_code
= process_2
.returncode
1656 output
= stdout
.decode("utf-8").strip()
1657 # output = stdout.decode()
1659 output
= stderr
.decode("utf-8").strip()
1660 # output = stderr.decode()
1662 if return_code
!= 0 and show_error_log
:
1664 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1667 self
.log
.debug("Return code: {}".format(return_code
))
1669 if raise_exception_on_error
and return_code
!= 0:
1670 raise K8sException(output
)
1673 output
= output
.encode("utf-8").strip()
1674 output
= str(output
).replace("\\n", "\n")
1676 return output
, return_code
1677 except asyncio
.CancelledError
:
1678 # first, kill the processes if they are still running
1679 for process
in (process_1
, process_2
):
1680 if process
.returncode
is None:
1683 except K8sException
:
1685 except Exception as e
:
1686 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1688 if raise_exception_on_error
:
1689 raise K8sException(e
) from e
1693 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1695 Obtains the data of the specified service in the k8cluster.
1697 :param cluster_id: id of a K8s cluster known by OSM
1698 :param service_name: name of the K8s service in the specified namespace
1699 :param namespace: K8s namespace used by the KDU instance
1700 :return: If successful, it will return a service with the following data:
1701 - `name` of the service
1702 - `type` type of service in the k8 cluster
1703 - `ports` List of ports offered by the service, for each port includes at least
1704 name, port, protocol
1705 - `cluster_ip` Internal ip to be used inside k8s cluster
1706 - `external_ip` List of external ips (in case they are available)
1710 paths
, env
= self
._init
_paths
_env
(
1711 cluster_name
=cluster_id
, create_if_not_exist
=True
1714 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1715 self
.kubectl_command
,
1716 paths
["kube_config"],
1718 quote(service_name
),
1721 output
, _rc
= await self
._local
_async
_exec
(
1722 command
=command
, raise_exception_on_error
=True, env
=env
1725 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1728 "name": service_name
,
1729 "type": self
._get
_deep
(data
, ("spec", "type")),
1730 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1731 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1733 if service
["type"] == "LoadBalancer":
1734 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1735 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1736 service
["external_ip"] = ip_list
1740 async def _exec_get_command(
1741 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1743 """Obtains information about the kdu instance."""
1745 full_command
= self
._get
_get
_command
(
1746 get_command
, kdu_instance
, namespace
, kubeconfig
1749 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1753 async def _exec_inspect_command(
1754 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1756 """Obtains information about an Helm Chart package (´helm show´ command)
1759 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1760 kdu_model: The name or path of an Helm Chart
1761 repo_url: Helm Chart repository url
1764 str: the requested info about the Helm Chart package
1769 repo_str
= " --repo {}".format(quote(repo_url
))
1771 # Obtain the Chart's name and store it in the var kdu_model
1772 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1774 kdu_model
, version
= self
._split
_version
(kdu_model
)
1776 version_str
= "--version {}".format(quote(version
))
1780 full_command
= self
._get
_inspect
_command
(
1781 show_command
=inspect_command
,
1782 kdu_model
=quote(kdu_model
),
1784 version
=version_str
,
1787 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1791 async def _get_replica_count_url(
1794 repo_url
: str = None,
1795 resource_name
: str = None,
1796 ) -> tuple[int, str]:
1797 """Get the replica count value in the Helm Chart Values.
1800 kdu_model: The name or path of an Helm Chart
1801 repo_url: Helm Chart repository url
1802 resource_name: Resource name
1806 - The number of replicas of the specific instance; if not found, returns None; and
1807 - The string corresponding to the replica count key in the Helm values
1810 kdu_values
= yaml
.load(
1811 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1812 Loader
=yaml
.SafeLoader
,
1815 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1819 "kdu_values not found for kdu_model {}".format(kdu_model
)
1823 kdu_values
= kdu_values
.get(resource_name
, None)
1826 msg
= "resource {} not found in the values in model {}".format(
1827 resource_name
, kdu_model
1830 raise K8sException(msg
)
1832 duplicate_check
= False
1837 if kdu_values
.get("replicaCount") is not None:
1838 replicas
= kdu_values
["replicaCount"]
1839 replica_str
= "replicaCount"
1840 elif kdu_values
.get("replicas") is not None:
1841 duplicate_check
= True
1842 replicas
= kdu_values
["replicas"]
1843 replica_str
= "replicas"
1847 "replicaCount or replicas not found in the resource"
1848 "{} values in model {}. Cannot be scaled".format(
1849 resource_name
, kdu_model
1854 "replicaCount or replicas not found in the values"
1855 "in model {}. Cannot be scaled".format(kdu_model
)
1858 raise K8sException(msg
)
1860 # Control if replicas and replicaCount exists at the same time
1861 msg
= "replicaCount and replicas are exists at the same time"
1863 if "replicaCount" in kdu_values
:
1865 raise K8sException(msg
)
1867 if "replicas" in kdu_values
:
1869 raise K8sException(msg
)
1871 return replicas
, replica_str
1873 async def _get_replica_count_instance(
1878 resource_name
: str = None,
1880 """Get the replica count value in the instance.
1883 kdu_instance: The name of the KDU instance
1884 namespace: KDU instance namespace
1886 resource_name: Resource name
1889 The number of replicas of the specific instance; if not found, returns None
1892 kdu_values
= yaml
.load(
1893 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1894 Loader
=yaml
.SafeLoader
,
1897 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1903 kdu_values
.get(resource_name
, None) if resource_name
else None
1906 for replica_str
in ("replicaCount", "replicas"):
1908 replicas
= resource_values
.get(replica_str
)
1910 replicas
= kdu_values
.get(replica_str
)
1912 if replicas
is not None:
1917 async def _store_status(
1922 namespace
: str = None,
1923 db_dict
: dict = None,
1926 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1928 :param cluster_id (str): the cluster where the KDU instance is deployed
1929 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1930 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1931 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1932 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1933 values for the keys:
1934 - "collection": The Mongo DB collection to write to
1935 - "filter": The query filter to use in the update process
1936 - "path": The dot separated keys which targets the object to be updated
1941 detailed_status
= await self
._status
_kdu
(
1942 cluster_id
=cluster_id
,
1943 kdu_instance
=kdu_instance
,
1945 namespace
=namespace
,
1948 status
= detailed_status
.get("info").get("description")
1949 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1951 # write status to db
1952 result
= await self
.write_app_status_to_db(
1955 detailed_status
=str(detailed_status
),
1956 operation
=operation
,
1960 self
.log
.info("Error writing in database. Task exiting...")
1962 except asyncio
.CancelledError
as e
:
1964 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1966 except Exception as e
:
1967 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1969 # params for use in -f file
1970 # returns values file option and filename (in order to delete it at the end)
1971 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> tuple[str, str]:
1972 if params
and len(params
) > 0:
1973 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1975 def get_random_number():
1976 r
= random
.SystemRandom().randint(1, 99999999)
1984 value
= params
.get(key
)
1985 if "!!yaml" in str(value
):
1986 value
= yaml
.safe_load(value
[7:])
1987 params2
[key
] = value
1989 values_file
= get_random_number() + ".yaml"
1990 with
open(values_file
, "w") as stream
:
1991 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1993 return "-f {}".format(values_file
), values_file
1997 # params for use in --set option
1999 def _params_to_set_option(params
: dict) -> str:
2001 f
"{quote(str(key))}={quote(str(value))}"
2002 for key
, value
in params
.items()
2003 if value
is not None
2007 return "--set " + ",".join(pairs
)
2010 def generate_kdu_instance_name(**kwargs
):
2011 chart_name
= kwargs
["kdu_model"]
2012 # check embeded chart (file or dir)
2013 if chart_name
.startswith("/"):
2014 # extract file or directory name
2015 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2017 elif "://" in chart_name
:
2018 # extract last portion of URL
2019 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2022 for c
in chart_name
:
2023 if c
.isalpha() or c
.isnumeric():
2030 # if does not start with alpha character, prefix 'a'
2031 if not name
[0].isalpha():
2036 def get_random_number():
2037 r
= random
.SystemRandom().randint(1, 99999999)
2039 s
= s
.rjust(10, "0")
2042 name
= name
+ get_random_number()
2045 def _split_version(self
, kdu_model
: str) -> tuple[str, str]:
2049 self
._is
_helm
_chart
_a
_file
(kdu_model
)
2050 or self
._is
_helm
_chart
_a
_url
(kdu_model
)
2052 and ":" in kdu_model
2054 parts
= kdu_model
.split(sep
=":")
2056 version
= str(parts
[1])
2057 kdu_model
= parts
[0]
2058 return kdu_model
, version
2060 def _split_repo(self
, kdu_model
: str) -> tuple[str, str]:
2061 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2064 kdu_model (str): Associated KDU model
2067 (str, str): Tuple with the Chart name in index 0, and the repo name
2068 in index 2; if there was a problem finding them, return None
2075 idx
= kdu_model
.find("/")
2076 if not self
._is
_helm
_chart
_a
_url
(kdu_model
) and idx
>= 0:
2077 chart_name
= kdu_model
[idx
+ 1 :]
2078 repo_name
= kdu_model
[:idx
]
2080 return chart_name
, repo_name
2082 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2083 """Obtain the Helm repository for an Helm Chart
2086 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2087 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2090 str: the repository URL; if Helm Chart is a local one, the function returns None
2093 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2097 # Find repository link
2098 local_repo_list
= await self
.repo_list(cluster_uuid
)
2099 for repo
in local_repo_list
:
2100 if repo
["name"] == repo_name
:
2101 repo_url
= repo
["url"]
2102 break # it is not necessary to continue the loop if the repo link was found...
2106 def _repo_to_oci_url(self
, repo
):
2107 db_repo
= self
.db
.get_one("k8srepos", {"name": repo
}, fail_on_empty
=False)
2108 if db_repo
and "oci" in db_repo
:
2109 return db_repo
.get("url")
2111 async def _prepare_helm_chart(self
, kdu_model
, cluster_id
):
2112 # e.g.: "stable/openldap", "1.0"
2113 kdu_model
, version
= self
._split
_version
(kdu_model
)
2114 # e.g.: "openldap, stable"
2115 chart_name
, repo
= self
._split
_repo
(kdu_model
)
2116 if repo
and chart_name
: # repo/chart case
2117 oci_url
= self
._repo
_to
_oci
_url
(repo
)
2118 if oci_url
: # oci does not require helm repo update
2119 kdu_model
= f
"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2121 await self
.repo_update(cluster_id
, repo
)
2122 return kdu_model
, version
2124 async def create_certificate(
2125 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2127 paths
, env
= self
._init
_paths
_env
(
2128 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2130 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2131 await kubectl
.create_certificate(
2132 namespace
=namespace
,
2134 dns_prefix
=dns_prefix
,
2135 secret_name
=secret_name
,
2137 issuer_name
="ca-issuer",
2140 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2141 paths
, env
= self
._init
_paths
_env
(
2142 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2144 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2145 await kubectl
.delete_certificate(namespace
, certificate_name
)
2147 async def create_namespace(
2154 Create a namespace in a specific cluster
2156 :param namespace: Namespace to be created
2157 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2158 :param labels: Dictionary with labels for the new namespace
2161 paths
, env
= self
._init
_paths
_env
(
2162 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2164 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2165 await kubectl
.create_namespace(
2170 async def delete_namespace(
2176 Delete a namespace in a specific cluster
2178 :param namespace: namespace to be deleted
2179 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2182 paths
, env
= self
._init
_paths
_env
(
2183 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2185 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2186 await kubectl
.delete_namespace(
2190 async def copy_secret_data(
2196 src_namespace
: str = "osm",
2197 dst_namespace
: str = "osm",
2200 Copy a single key and value from an existing secret to a new one
2202 :param src_secret: name of the existing secret
2203 :param dst_secret: name of the new secret
2204 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2205 :param data_key: key of the existing secret to be copied
2206 :param src_namespace: Namespace of the existing secret
2207 :param dst_namespace: Namespace of the new secret
2210 paths
, env
= self
._init
_paths
_env
(
2211 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2213 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2214 secret_data
= await kubectl
.get_secret_content(
2216 namespace
=src_namespace
,
2218 # Only the corresponding data_key value needs to be copy
2219 data
= {data_key
: secret_data
.get(data_key
)}
2220 await kubectl
.create_secret(
2223 namespace
=dst_namespace
,
2224 secret_type
="Opaque",
2227 async def setup_default_rbac(
2238 Create a basic RBAC for a new namespace.
2240 :param name: name of both Role and Role Binding
2241 :param namespace: K8s namespace
2242 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2243 :param api_groups: Api groups to be allowed in Policy Rule
2244 :param resources: Resources to be allowed in Policy Rule
2245 :param verbs: Verbs to be allowed in Policy Rule
2246 :param service_account: Service Account name used to bind the Role
2249 paths
, env
= self
._init
_paths
_env
(
2250 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2252 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2253 await kubectl
.create_role(
2256 namespace
=namespace
,
2257 api_groups
=api_groups
,
2258 resources
=resources
,
2261 await kubectl
.create_role_binding(
2264 namespace
=namespace
,
2266 sa_name
=service_account
,