abf2d7e582391417df279403ad8a01d81cb1640f
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
25 from shlex
import quote
33 from uuid
import uuid4
34 from urllib
.parse
import urlparse
36 from n2vc
.config
import EnvironConfig
37 from n2vc
.exceptions
import K8sException
38 from n2vc
.k8s_conn
import K8sConnector
39 from n2vc
.kubectl
import Kubectl
42 class K8sHelmBaseConnector(K8sConnector
):
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
50 service_account
= "osm"
56 kubectl_command
: str = "/usr/bin/kubectl",
57 helm_command
: str = "/usr/bin/helm",
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
68 :param on_update_db: callback called when k8s connector updates database
72 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
74 self
.log
.info("Initializing K8S Helm connector")
76 self
.config
= EnvironConfig()
77 # random numbers for release name generation
78 random
.seed(time
.time())
83 # exception if kubectl is not installed
84 self
.kubectl_command
= kubectl_command
85 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
87 # exception if helm is not installed
88 self
._helm
_command
= helm_command
89 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
91 # obtain stable repo url from config or apply default
92 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
93 if self
._stable
_repo
_url
== "None":
94 self
._stable
_repo
_url
= None
96 # Lock to avoid concurrent execution of helm commands
97 self
.cmd_lock
= asyncio
.Lock()
99 def _get_namespace(self
, cluster_uuid
: str) -> str:
101 Obtains the namespace used by the cluster with the uuid passed by argument
103 param: cluster_uuid: cluster's uuid
106 # first, obtain the cluster corresponding to the uuid passed by argument
107 k8scluster
= self
.db
.get_one(
108 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
110 return k8scluster
.get("namespace")
115 namespace
: str = "kube-system",
116 reuse_cluster_uuid
=None,
118 ) -> tuple[str, bool]:
120 It prepares a given K8s cluster environment to run Charts
122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
124 :param namespace: optional namespace to be used for helm. By default,
125 'kube-system' will be used
126 :param reuse_cluster_uuid: existing cluster uuid for reuse
127 :param kwargs: Additional parameters (None yet)
128 :return: uuid of the K8s cluster and True if connector has installed some
129 software in the cluster
130 (on error, an exception will be raised)
133 if reuse_cluster_uuid
:
134 cluster_id
= reuse_cluster_uuid
136 cluster_id
= str(uuid4())
139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_id
, create_if_not_exist
=True
145 mode
= stat
.S_IRUSR | stat
.S_IWUSR
146 with
open(paths
["kube_config"], "w", mode
) as f
:
148 os
.chmod(paths
["kube_config"], 0o600)
150 # Code with initialization specific of helm version
151 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
153 # sync fs with local data
154 self
.fs
.reverse_sync(from_path
=cluster_id
)
156 self
.log
.info("Cluster {} initialized".format(cluster_id
))
158 return cluster_id
, n2vc_installed_sw
165 repo_type
: str = "chart",
168 password
: str = None,
172 "Cluster {}, adding {} repository {}. URL: {}".format(
173 cluster_uuid
, repo_type
, name
, url
178 paths
, env
= self
._init
_paths
_env
(
179 cluster_name
=cluster_uuid
, create_if_not_exist
=True
183 self
.fs
.sync(from_path
=cluster_uuid
)
186 if user
and password
:
187 host_port
= urlparse(url
).netloc
if url
.startswith("oci://") else url
188 # helm registry login url
189 command
= "env KUBECONFIG={} {} registry login {}".format(
190 paths
["kube_config"], self
._helm
_command
, quote(host_port
)
194 "OCI registry login is not needed for repo: {}".format(name
)
198 # helm repo add name url
199 command
= "env KUBECONFIG={} {} repo add {} {}".format(
200 paths
["kube_config"], self
._helm
_command
, quote(name
), quote(url
)
204 temp_cert_file
= os
.path
.join(
205 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
207 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
208 with
open(temp_cert_file
, "w") as the_cert
:
210 command
+= " --ca-file {}".format(quote(temp_cert_file
))
213 command
+= " --username={}".format(quote(user
))
216 command
+= " --password={}".format(quote(password
))
218 self
.log
.debug("adding repo: {}".format(command
))
219 await self
._local
_async
_exec
(
220 command
=command
, raise_exception_on_error
=True, env
=env
225 command
= "env KUBECONFIG={} {} repo update {}".format(
226 paths
["kube_config"], self
._helm
_command
, quote(name
)
228 self
.log
.debug("updating repo: {}".format(command
))
229 await self
._local
_async
_exec
(
230 command
=command
, raise_exception_on_error
=False, env
=env
234 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
236 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
238 "Cluster {}, updating {} repository {}".format(
239 cluster_uuid
, repo_type
, name
244 paths
, env
= self
._init
_paths
_env
(
245 cluster_name
=cluster_uuid
, create_if_not_exist
=True
249 self
.fs
.sync(from_path
=cluster_uuid
)
252 command
= "{} repo update {}".format(self
._helm
_command
, quote(name
))
253 self
.log
.debug("updating repo: {}".format(command
))
254 await self
._local
_async
_exec
(
255 command
=command
, raise_exception_on_error
=False, env
=env
259 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
261 async def repo_list(self
, cluster_uuid
: str) -> list:
263 Get the list of registered repositories
265 :return: list of registered repositories: [ (name, url) .... ]
268 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
271 paths
, env
= self
._init
_paths
_env
(
272 cluster_name
=cluster_uuid
, create_if_not_exist
=True
276 self
.fs
.sync(from_path
=cluster_uuid
)
278 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
279 paths
["kube_config"], self
._helm
_command
282 # Set exception to false because if there are no repos just want an empty list
283 output
, _rc
= await self
._local
_async
_exec
(
284 command
=command
, raise_exception_on_error
=False, env
=env
288 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
291 if output
and len(output
) > 0:
292 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
293 # unify format between helm2 and helm3 setting all keys lowercase
294 return self
._lower
_keys
_list
(repos
)
300 async def repo_remove(self
, cluster_uuid
: str, name
: str):
302 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
306 paths
, env
= self
._init
_paths
_env
(
307 cluster_name
=cluster_uuid
, create_if_not_exist
=True
311 self
.fs
.sync(from_path
=cluster_uuid
)
313 command
= "env KUBECONFIG={} {} repo remove {}".format(
314 paths
["kube_config"], self
._helm
_command
, quote(name
)
316 await self
._local
_async
_exec
(
317 command
=command
, raise_exception_on_error
=True, env
=env
321 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
327 uninstall_sw
: bool = False,
332 Resets the Kubernetes cluster by removing the helm deployment that represents it.
334 :param cluster_uuid: The UUID of the cluster to reset
335 :param force: Boolean to force the reset
336 :param uninstall_sw: Boolean to force the reset
337 :param kwargs: Additional parameters (None yet)
338 :return: Returns True if successful or raises an exception.
340 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343 cluster_uuid
, uninstall_sw
348 self
.fs
.sync(from_path
=cluster_uuid
)
350 # uninstall releases if needed.
352 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
353 if len(releases
) > 0:
357 kdu_instance
= r
.get("name")
358 chart
= r
.get("chart")
360 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
362 await self
.uninstall(
363 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
365 except Exception as e
:
366 # will not raise exception as it was found
367 # that in some cases of previously installed helm releases it
370 "Error uninstalling release {}: {}".format(
376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377 ).format(cluster_uuid
)
380 False # Allow to remove k8s cluster without removing Tiller
384 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
386 # delete cluster directory
387 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
388 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
389 # Remove also local directorio if still exist
390 direct
= self
.fs
.path
+ "/" + cluster_uuid
391 shutil
.rmtree(direct
, ignore_errors
=True)
395 def _is_helm_chart_a_file(self
, chart_name
: str):
396 return chart_name
.count("/") > 1
399 def _is_helm_chart_a_url(chart_name
: str):
400 result
= urlparse(chart_name
)
401 return all([result
.scheme
, result
.netloc
])
403 async def _install_impl(
411 timeout
: float = 300,
413 db_dict
: dict = None,
414 kdu_name
: str = None,
415 namespace
: str = None,
418 paths
, env
= self
._init
_paths
_env
(
419 cluster_name
=cluster_id
, create_if_not_exist
=True
423 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
424 cluster_id
=cluster_id
, params
=params
427 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_id
)
429 command
= self
._get
_install
_command
(
437 paths
["kube_config"],
440 self
.log
.debug("installing: {}".format(command
))
443 # exec helm in a task
444 exec_task
= asyncio
.ensure_future(
445 coro_or_future
=self
._local
_async
_exec
(
446 command
=command
, raise_exception_on_error
=False, env
=env
450 # write status in another task
451 status_task
= asyncio
.ensure_future(
452 coro_or_future
=self
._store
_status
(
453 cluster_id
=cluster_id
,
454 kdu_instance
=kdu_instance
,
461 # wait for execution task
462 await asyncio
.wait([exec_task
])
467 output
, rc
= exec_task
.result()
470 output
, rc
= await self
._local
_async
_exec
(
471 command
=command
, raise_exception_on_error
=False, env
=env
474 # remove temporal values yaml file
476 os
.remove(file_to_delete
)
479 await self
._store
_status
(
480 cluster_id
=cluster_id
,
481 kdu_instance
=kdu_instance
,
488 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
490 raise K8sException(msg
)
496 kdu_model
: str = None,
498 timeout
: float = 300,
500 db_dict
: dict = None,
501 namespace
: str = None,
502 reset_values
: bool = False,
503 reuse_values
: bool = True,
504 reset_then_reuse_values
: bool = False,
507 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
510 self
.fs
.sync(from_path
=cluster_uuid
)
512 # look for instance to obtain namespace
516 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
517 if not instance_info
:
518 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
519 namespace
= instance_info
["namespace"]
522 paths
, env
= self
._init
_paths
_env
(
523 cluster_name
=cluster_uuid
, create_if_not_exist
=True
527 self
.fs
.sync(from_path
=cluster_uuid
)
530 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
531 cluster_id
=cluster_uuid
, params
=params
534 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
536 command
= self
._get
_upgrade
_command
(
544 paths
["kube_config"],
547 reset_then_reuse_values
,
551 self
.log
.debug("upgrading: {}".format(command
))
554 # exec helm in a task
555 exec_task
= asyncio
.ensure_future(
556 coro_or_future
=self
._local
_async
_exec
(
557 command
=command
, raise_exception_on_error
=False, env
=env
560 # write status in another task
561 status_task
= asyncio
.ensure_future(
562 coro_or_future
=self
._store
_status
(
563 cluster_id
=cluster_uuid
,
564 kdu_instance
=kdu_instance
,
571 # wait for execution task
572 await asyncio
.wait([exec_task
])
576 output
, rc
= exec_task
.result()
579 output
, rc
= await self
._local
_async
_exec
(
580 command
=command
, raise_exception_on_error
=False, env
=env
583 # remove temporal values yaml file
585 os
.remove(file_to_delete
)
588 await self
._store
_status
(
589 cluster_id
=cluster_uuid
,
590 kdu_instance
=kdu_instance
,
597 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
599 raise K8sException(msg
)
602 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
604 # return new revision number
605 instance
= await self
.get_instance_info(
606 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
609 revision
= int(instance
.get("revision"))
610 self
.log
.debug("New revision: {}".format(revision
))
620 total_timeout
: float = 1800,
621 cluster_uuid
: str = None,
622 kdu_model
: str = None,
624 db_dict
: dict = None,
627 """Scale a resource in a Helm Chart.
630 kdu_instance: KDU instance name
631 scale: Scale to which to set the resource
632 resource_name: Resource name
633 total_timeout: The time, in seconds, to wait
634 cluster_uuid: The UUID of the cluster
635 kdu_model: The chart reference
636 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
637 The --wait flag will be set automatically if --atomic is used
638 db_dict: Dictionary for any additional data
639 kwargs: Additional parameters
642 True if successful, False otherwise
645 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
647 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
648 resource_name
, kdu_model
, cluster_uuid
651 self
.log
.debug(debug_mgs
)
653 # look for instance to obtain namespace
654 # get_instance_info function calls the sync command
655 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
656 if not instance_info
:
657 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
660 paths
, env
= self
._init
_paths
_env
(
661 cluster_name
=cluster_uuid
, create_if_not_exist
=True
665 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
667 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
669 _
, replica_str
= await self
._get
_replica
_count
_url
(
670 kdu_model
, repo_url
, resource_name
673 command
= self
._get
_upgrade
_scale
_command
(
676 instance_info
["namespace"],
683 paths
["kube_config"],
686 self
.log
.debug("scaling: {}".format(command
))
689 # exec helm in a task
690 exec_task
= asyncio
.ensure_future(
691 coro_or_future
=self
._local
_async
_exec
(
692 command
=command
, raise_exception_on_error
=False, env
=env
695 # write status in another task
696 status_task
= asyncio
.ensure_future(
697 coro_or_future
=self
._store
_status
(
698 cluster_id
=cluster_uuid
,
699 kdu_instance
=kdu_instance
,
700 namespace
=instance_info
["namespace"],
706 # wait for execution task
707 await asyncio
.wait([exec_task
])
711 output
, rc
= exec_task
.result()
714 output
, rc
= await self
._local
_async
_exec
(
715 command
=command
, raise_exception_on_error
=False, env
=env
719 await self
._store
_status
(
720 cluster_id
=cluster_uuid
,
721 kdu_instance
=kdu_instance
,
722 namespace
=instance_info
["namespace"],
728 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
730 raise K8sException(msg
)
733 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
737 async def get_scale_count(
745 """Get a resource scale count.
748 cluster_uuid: The UUID of the cluster
749 resource_name: Resource name
750 kdu_instance: KDU instance name
751 kdu_model: The name or path of an Helm Chart
752 kwargs: Additional parameters
755 Resource instance count
759 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
762 # look for instance to obtain namespace
763 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
764 if not instance_info
:
765 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
768 paths
, _
= self
._init
_paths
_env
(
769 cluster_name
=cluster_uuid
, create_if_not_exist
=True
772 replicas
= await self
._get
_replica
_count
_instance
(
773 kdu_instance
=kdu_instance
,
774 namespace
=instance_info
["namespace"],
775 kubeconfig
=paths
["kube_config"],
776 resource_name
=resource_name
,
780 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
783 # Get default value if scale count is not found from provided values
784 # Important note: this piece of code shall only be executed in the first scaling operation,
785 # since it is expected that the _get_replica_count_instance is able to obtain the number of
786 # replicas when a scale operation was already conducted previously for this KDU/resource!
788 repo_url
= await self
._find
_repo
(
789 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
791 replicas
, _
= await self
._get
_replica
_count
_url
(
792 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
796 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
797 f
"{resource_name} obtained: {replicas}"
801 msg
= "Replica count not found. Cannot be scaled"
803 raise K8sException(msg
)
808 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
811 "rollback kdu_instance {} to revision {} from cluster {}".format(
812 kdu_instance
, revision
, cluster_uuid
817 self
.fs
.sync(from_path
=cluster_uuid
)
819 # look for instance to obtain namespace
820 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
821 if not instance_info
:
822 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
825 paths
, env
= self
._init
_paths
_env
(
826 cluster_name
=cluster_uuid
, create_if_not_exist
=True
830 self
.fs
.sync(from_path
=cluster_uuid
)
832 command
= self
._get
_rollback
_command
(
833 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
836 self
.log
.debug("rolling_back: {}".format(command
))
838 # exec helm in a task
839 exec_task
= asyncio
.ensure_future(
840 coro_or_future
=self
._local
_async
_exec
(
841 command
=command
, raise_exception_on_error
=False, env
=env
844 # write status in another task
845 status_task
= asyncio
.ensure_future(
846 coro_or_future
=self
._store
_status
(
847 cluster_id
=cluster_uuid
,
848 kdu_instance
=kdu_instance
,
849 namespace
=instance_info
["namespace"],
851 operation
="rollback",
855 # wait for execution task
856 await asyncio
.wait([exec_task
])
861 output
, rc
= exec_task
.result()
864 await self
._store
_status
(
865 cluster_id
=cluster_uuid
,
866 kdu_instance
=kdu_instance
,
867 namespace
=instance_info
["namespace"],
869 operation
="rollback",
873 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
875 raise K8sException(msg
)
878 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
880 # return new revision number
881 instance
= await self
.get_instance_info(
882 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
885 revision
= int(instance
.get("revision"))
886 self
.log
.debug("New revision: {}".format(revision
))
891 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
893 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
894 (this call should happen after all _terminate-config-primitive_ of the VNF
897 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
898 :param kdu_instance: unique name for the KDU instance to be deleted
899 :param kwargs: Additional parameters (None yet)
900 :return: True if successful
904 "uninstall kdu_instance {} from cluster {}".format(
905 kdu_instance
, cluster_uuid
910 self
.fs
.sync(from_path
=cluster_uuid
)
912 # look for instance to obtain namespace
913 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
914 if not instance_info
:
915 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
918 paths
, env
= self
._init
_paths
_env
(
919 cluster_name
=cluster_uuid
, create_if_not_exist
=True
923 self
.fs
.sync(from_path
=cluster_uuid
)
925 command
= self
._get
_uninstall
_command
(
926 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
928 output
, _rc
= await self
._local
_async
_exec
(
929 command
=command
, raise_exception_on_error
=True, env
=env
933 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
935 return self
._output
_to
_table
(output
)
937 async def instances_list(self
, cluster_uuid
: str) -> list:
939 returns a list of deployed releases in a cluster
941 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
945 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
948 self
.fs
.sync(from_path
=cluster_uuid
)
950 # execute internal command
951 result
= await self
._instances
_list
(cluster_uuid
)
954 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
958 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
959 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
960 for instance
in instances
:
961 if instance
.get("name") == kdu_instance
:
963 self
.log
.debug("Instance {} not found".format(kdu_instance
))
966 async def upgrade_charm(
970 charm_id
: str = None,
971 charm_type
: str = None,
972 timeout
: float = None,
974 """This method upgrade charms in VNFs
977 ee_id: Execution environment id
978 path: Local path to the charm
980 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
981 timeout: (Float) Timeout for the ns update operation
984 The output of the update operation if status equals to "completed"
986 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
988 async def exec_primitive(
990 cluster_uuid
: str = None,
991 kdu_instance
: str = None,
992 primitive_name
: str = None,
993 timeout
: float = 300,
995 db_dict
: dict = None,
998 """Exec primitive (Juju action)
1000 :param cluster_uuid: The UUID of the cluster or namespace:cluster
1001 :param kdu_instance: The unique name of the KDU instance
1002 :param primitive_name: Name of action that will be executed
1003 :param timeout: Timeout for action execution
1004 :param params: Dictionary of all the parameters needed for the action
1005 :db_dict: Dictionary for any additional data
1006 :param kwargs: Additional parameters (None yet)
1008 :return: Returns the output of the action
1011 "KDUs deployed with Helm don't support actions "
1012 "different from rollback, upgrade and status"
1015 async def get_services(
1016 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1019 Returns a list of services defined for the specified kdu instance.
1021 :param cluster_uuid: UUID of a K8s cluster known by OSM
1022 :param kdu_instance: unique name for the KDU instance
1023 :param namespace: K8s namespace used by the KDU instance
1024 :return: If successful, it will return a list of services, Each service
1025 can have the following data:
1026 - `name` of the service
1027 - `type` type of service in the k8 cluster
1028 - `ports` List of ports offered by the service, for each port includes at least
1029 name, port, protocol
1030 - `cluster_ip` Internal ip to be used inside k8s cluster
1031 - `external_ip` List of external ips (in case they are available)
1035 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1036 cluster_uuid
, kdu_instance
1041 paths
, env
= self
._init
_paths
_env
(
1042 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1046 self
.fs
.sync(from_path
=cluster_uuid
)
1048 # get list of services names for kdu
1049 service_names
= await self
._get
_services
(
1050 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1054 for service
in service_names
:
1055 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1056 service_list
.append(service
)
1059 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1063 async def get_service(
1064 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1067 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1068 service_name
, namespace
, cluster_uuid
1073 self
.fs
.sync(from_path
=cluster_uuid
)
1075 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1078 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1082 async def status_kdu(
1083 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1084 ) -> Union
[str, dict]:
1086 This call would retrieve tha current state of a given KDU instance. It would be
1087 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1088 values_ of the configuration parameters applied to a given instance. This call
1089 would be based on the `status` call.
1091 :param cluster_uuid: UUID of a K8s cluster known by OSM
1092 :param kdu_instance: unique name for the KDU instance
1093 :param kwargs: Additional parameters (None yet)
1094 :param yaml_format: if the return shall be returned as an YAML string or as a
1096 :return: If successful, it will return the following vector of arguments:
1097 - K8s `namespace` in the cluster where the KDU lives
1098 - `state` of the KDU instance. It can be:
1105 - List of `resources` (objects) that this release consists of, sorted by kind,
1106 and the status of those resources
1107 - Last `deployment_time`.
1111 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1112 cluster_uuid
, kdu_instance
1117 self
.fs
.sync(from_path
=cluster_uuid
)
1119 # get instance: needed to obtain namespace
1120 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1121 for instance
in instances
:
1122 if instance
.get("name") == kdu_instance
:
1125 # instance does not exist
1127 "Instance name: {} not found in cluster: {}".format(
1128 kdu_instance
, cluster_uuid
1132 status
= await self
._status
_kdu
(
1133 cluster_id
=cluster_uuid
,
1134 kdu_instance
=kdu_instance
,
1135 namespace
=instance
["namespace"],
1136 yaml_format
=yaml_format
,
1137 show_error_log
=True,
1141 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1145 async def get_values_kdu(
1146 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1148 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1150 return await self
._exec
_get
_command
(
1151 get_command
="values",
1152 kdu_instance
=kdu_instance
,
1153 namespace
=namespace
,
1154 kubeconfig
=kubeconfig
,
1157 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1158 """Method to obtain the Helm Chart package's values
1161 kdu_model: The name or path of an Helm Chart
1162 repo_url: Helm Chart repository url
1165 str: the values of the Helm Chart package
1169 "inspect kdu_model values {} from (optional) repo: {}".format(
1174 return await self
._exec
_inspect
_command
(
1175 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1178 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1180 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1183 return await self
._exec
_inspect
_command
(
1184 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1187 async def synchronize_repos(self
, cluster_uuid
: str):
1188 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1190 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1191 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1193 local_repo_list
= await self
.repo_list(cluster_uuid
)
1194 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1196 deleted_repo_list
= []
1197 added_repo_dict
= {}
1199 # iterate over the list of repos in the database that should be
1200 # added if not present
1201 for repo_name
, db_repo
in db_repo_dict
.items():
1203 # check if it is already present
1204 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1205 repo_id
= db_repo
.get("_id")
1206 if curr_repo_url
!= db_repo
["url"]:
1209 "repo {} url changed, delete and and again".format(
1213 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1214 deleted_repo_list
.append(repo_id
)
1217 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1218 await self
.repo_add(
1222 cert
=db_repo
.get("ca_cert"),
1223 user
=db_repo
.get("user"),
1224 password
=db_repo
.get("password"),
1225 oci
=db_repo
.get("oci", False),
1227 added_repo_dict
[repo_id
] = db_repo
["name"]
1228 except Exception as e
:
1230 "Error adding repo id: {}, err_msg: {} ".format(
1235 # Delete repos that are present but not in nbi_list
1236 for repo_name
in local_repo_dict
:
1237 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1238 self
.log
.debug("delete repo {}".format(repo_name
))
1240 await self
.repo_remove(cluster_uuid
, repo_name
)
1241 deleted_repo_list
.append(repo_name
)
1242 except Exception as e
:
1244 "Error deleting repo, name: {}, err_msg: {}".format(
1249 return deleted_repo_list
, added_repo_dict
1251 except K8sException
:
1253 except Exception as e
:
1254 # Do not raise errors synchronizing repos
1255 self
.log
.error("Error synchronizing repos: {}".format(e
))
1256 raise Exception("Error synchronizing repos: {}".format(e
))
1258 def _get_db_repos_dict(self
, repo_ids
: list):
1260 for repo_id
in repo_ids
:
1261 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1262 db_repos_dict
[db_repo
["name"]] = db_repo
1263 return db_repos_dict
1266 ####################################################################################
1267 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1268 ####################################################################################
1272 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1274 Creates and returns base cluster and kube dirs and returns them.
1275 Also created helm3 dirs according to new directory specification, paths are
1276 not returned but assigned to helm environment variables
1278 :param cluster_name: cluster_name
1279 :return: Dictionary with config_paths and dictionary with helm environment variables
1283 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1285 Implements the helm version dependent cluster initialization
1289 async def _instances_list(self
, cluster_id
):
1291 Implements the helm version dependent helm instances list
1295 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1297 Implements the helm version dependent method to obtain services from a helm instance
1301 async def _status_kdu(
1305 namespace
: str = None,
1306 yaml_format
: bool = False,
1307 show_error_log
: bool = False,
1308 ) -> Union
[str, dict]:
1310 Implements the helm version dependent method to obtain status of a helm instance
1314 def _get_install_command(
1326 Obtain command to be executed to delete the indicated instance
1330 def _get_upgrade_scale_command(
1343 """Generates the command to scale a Helm Chart release
1346 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1347 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1348 namespace (str): Namespace where this KDU instance is deployed
1349 scale (int): Scale count
1350 version (str): Constraint with specific version of the Chart to use
1351 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1352 The --wait flag will be set automatically if --atomic is used
1353 replica_str (str): The key under resource_name key where the scale count is stored
1354 timeout (float): The time, in seconds, to wait
1355 resource_name (str): The KDU's resource to scale
1356 kubeconfig (str): Kubeconfig file path
1359 str: command to scale a Helm Chart release
1363 def _get_upgrade_command(
1375 reset_then_reuse_values
,
1378 """Generates the command to upgrade a Helm Chart release
1381 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1382 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1383 namespace (str): Namespace where this KDU instance is deployed
1384 params_str (str): Params used to upgrade the Helm Chart release
1385 version (str): Constraint with specific version of the Chart to use
1386 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1387 The --wait flag will be set automatically if --atomic is used
1388 timeout (float): The time, in seconds, to wait
1389 kubeconfig (str): Kubeconfig file path
1390 reset_values(bool): If set, helm resets values instead of reusing previous values.
1391 reuse_values(bool): If set, helm reuses previous values.
1392 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
1393 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1395 str: command to upgrade a Helm Chart release
1399 def _get_rollback_command(
1400 self
, kdu_instance
, namespace
, revision
, kubeconfig
1403 Obtain command to be executed to rollback the indicated instance
1407 def _get_uninstall_command(
1408 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1411 Obtain command to be executed to delete the indicated instance
1415 def _get_inspect_command(
1416 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1418 """Generates the command to obtain the information about an Helm Chart package
1419 (´helm show ...´ command)
1422 show_command: the second part of the command (`helm show <show_command>`)
1423 kdu_model: The name or path of an Helm Chart
1424 repo_url: Helm Chart repository url
1425 version: constraint with specific version of the Chart to use
1428 str: the generated Helm Chart command
1432 def _get_get_command(
1433 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1435 """Obtain command to be executed to get information about the kdu instance."""
1438 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1440 Method call to uninstall cluster software for helm. This method is dependent
1442 For Helm v2 it will be called when Tiller must be uninstalled
1443 For Helm v3 it does nothing and does not need to be callled
1447 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1449 Obtains the cluster repos identifiers
1453 ####################################################################################
1454 ################################### P R I V A T E ##################################
1455 ####################################################################################
1459 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1460 if os
.path
.exists(filename
):
1463 msg
= "File {} does not exist".format(filename
)
1464 if exception_if_not_exists
:
1465 raise K8sException(msg
)
1468 def _remove_multiple_spaces(strobj
):
1469 strobj
= strobj
.strip()
1470 while " " in strobj
:
1471 strobj
= strobj
.replace(" ", " ")
1475 def _output_to_lines(output
: str) -> list:
1476 output_lines
= list()
1477 lines
= output
.splitlines(keepends
=False)
1481 output_lines
.append(line
)
1485 def _output_to_table(output
: str) -> list:
1486 output_table
= list()
1487 lines
= output
.splitlines(keepends
=False)
1489 line
= line
.replace("\t", " ")
1491 output_table
.append(line_list
)
1492 cells
= line
.split(sep
=" ")
1496 line_list
.append(cell
)
1500 def _parse_services(output
: str) -> list:
1501 lines
= output
.splitlines(keepends
=False)
1504 line
= line
.replace("\t", " ")
1505 cells
= line
.split(sep
=" ")
1506 if len(cells
) > 0 and cells
[0].startswith("service/"):
1507 elems
= cells
[0].split(sep
="/")
1509 services
.append(elems
[1])
1513 def _get_deep(dictionary
: dict, members
: tuple):
1518 value
= target
.get(m
)
1527 # find key:value in several lines
1529 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1530 for line
in p_lines
:
1532 if line
.startswith(p_key
+ ":"):
1533 parts
= line
.split(":")
1534 the_value
= parts
[1].strip()
1542 def _lower_keys_list(input_list
: list):
1544 Transform the keys in a list of dictionaries to lower case and returns a new list
1549 for dictionary
in input_list
:
1550 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1551 new_list
.append(new_dict
)
1554 async def _local_async_exec(
1557 raise_exception_on_error
: bool = False,
1558 show_error_log
: bool = True,
1559 encode_utf8
: bool = False,
1561 ) -> tuple[str, int]:
1562 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1564 "Executing async local command: {}, env: {}".format(command
, env
)
1568 command
= shlex
.split(command
)
1570 environ
= os
.environ
.copy()
1575 async with self
.cmd_lock
:
1576 process
= await asyncio
.create_subprocess_exec(
1578 stdout
=asyncio
.subprocess
.PIPE
,
1579 stderr
=asyncio
.subprocess
.PIPE
,
1583 # wait for command terminate
1584 stdout
, stderr
= await process
.communicate()
1586 return_code
= process
.returncode
1590 output
= stdout
.decode("utf-8").strip()
1591 # output = stdout.decode()
1593 output
= stderr
.decode("utf-8").strip()
1594 # output = stderr.decode()
1596 if return_code
!= 0 and show_error_log
:
1598 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1601 self
.log
.debug("Return code: {}".format(return_code
))
1603 if raise_exception_on_error
and return_code
!= 0:
1604 raise K8sException(output
)
1607 output
= output
.encode("utf-8").strip()
1608 output
= str(output
).replace("\\n", "\n")
1610 return output
, return_code
1612 except asyncio
.CancelledError
:
1613 # first, kill the process if it is still running
1614 if process
.returncode
is None:
1617 except K8sException
:
1619 except Exception as e
:
1620 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1622 if raise_exception_on_error
:
1623 raise K8sException(e
) from e
1627 async def _local_async_exec_pipe(
1631 raise_exception_on_error
: bool = True,
1632 show_error_log
: bool = True,
1633 encode_utf8
: bool = False,
1636 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1637 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1638 command
= "{} | {}".format(command1
, command2
)
1640 "Executing async local command: {}, env: {}".format(command
, env
)
1644 command1
= shlex
.split(command1
)
1645 command2
= shlex
.split(command2
)
1647 environ
= os
.environ
.copy()
1652 async with self
.cmd_lock
:
1653 read
, write
= os
.pipe()
1654 process_1
= await asyncio
.create_subprocess_exec(
1655 *command1
, stdout
=write
, env
=environ
1658 process_2
= await asyncio
.create_subprocess_exec(
1659 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1662 stdout
, stderr
= await process_2
.communicate()
1664 return_code
= process_2
.returncode
1668 output
= stdout
.decode("utf-8").strip()
1669 # output = stdout.decode()
1671 output
= stderr
.decode("utf-8").strip()
1672 # output = stderr.decode()
1674 if return_code
!= 0 and show_error_log
:
1676 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1679 self
.log
.debug("Return code: {}".format(return_code
))
1681 if raise_exception_on_error
and return_code
!= 0:
1682 raise K8sException(output
)
1685 output
= output
.encode("utf-8").strip()
1686 output
= str(output
).replace("\\n", "\n")
1688 return output
, return_code
1689 except asyncio
.CancelledError
:
1690 # first, kill the processes if they are still running
1691 for process
in (process_1
, process_2
):
1692 if process
.returncode
is None:
1695 except K8sException
:
1697 except Exception as e
:
1698 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1700 if raise_exception_on_error
:
1701 raise K8sException(e
) from e
1705 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1707 Obtains the data of the specified service in the k8cluster.
1709 :param cluster_id: id of a K8s cluster known by OSM
1710 :param service_name: name of the K8s service in the specified namespace
1711 :param namespace: K8s namespace used by the KDU instance
1712 :return: If successful, it will return a service with the following data:
1713 - `name` of the service
1714 - `type` type of service in the k8 cluster
1715 - `ports` List of ports offered by the service, for each port includes at least
1716 name, port, protocol
1717 - `cluster_ip` Internal ip to be used inside k8s cluster
1718 - `external_ip` List of external ips (in case they are available)
1722 paths
, env
= self
._init
_paths
_env
(
1723 cluster_name
=cluster_id
, create_if_not_exist
=True
1726 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1727 self
.kubectl_command
,
1728 paths
["kube_config"],
1730 quote(service_name
),
1733 output
, _rc
= await self
._local
_async
_exec
(
1734 command
=command
, raise_exception_on_error
=True, env
=env
1737 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1740 "name": service_name
,
1741 "type": self
._get
_deep
(data
, ("spec", "type")),
1742 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1743 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1745 if service
["type"] == "LoadBalancer":
1746 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1747 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1748 service
["external_ip"] = ip_list
1752 async def _exec_get_command(
1753 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1755 """Obtains information about the kdu instance."""
1757 full_command
= self
._get
_get
_command
(
1758 get_command
, kdu_instance
, namespace
, kubeconfig
1761 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1765 async def _exec_inspect_command(
1766 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1768 """Obtains information about an Helm Chart package (´helm show´ command)
1771 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1772 kdu_model: The name or path of an Helm Chart
1773 repo_url: Helm Chart repository url
1776 str: the requested info about the Helm Chart package
1781 repo_str
= " --repo {}".format(quote(repo_url
))
1783 # Obtain the Chart's name and store it in the var kdu_model
1784 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1786 kdu_model
, version
= self
._split
_version
(kdu_model
)
1788 version_str
= "--version {}".format(quote(version
))
1792 full_command
= self
._get
_inspect
_command
(
1793 show_command
=inspect_command
,
1794 kdu_model
=quote(kdu_model
),
1796 version
=version_str
,
1799 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1803 async def _get_replica_count_url(
1806 repo_url
: str = None,
1807 resource_name
: str = None,
1808 ) -> tuple[int, str]:
1809 """Get the replica count value in the Helm Chart Values.
1812 kdu_model: The name or path of an Helm Chart
1813 repo_url: Helm Chart repository url
1814 resource_name: Resource name
1818 - The number of replicas of the specific instance; if not found, returns None; and
1819 - The string corresponding to the replica count key in the Helm values
1822 kdu_values
= yaml
.load(
1823 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1824 Loader
=yaml
.SafeLoader
,
1827 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1831 "kdu_values not found for kdu_model {}".format(kdu_model
)
1835 kdu_values
= kdu_values
.get(resource_name
, None)
1838 msg
= "resource {} not found in the values in model {}".format(
1839 resource_name
, kdu_model
1842 raise K8sException(msg
)
1844 duplicate_check
= False
1849 if kdu_values
.get("replicaCount") is not None:
1850 replicas
= kdu_values
["replicaCount"]
1851 replica_str
= "replicaCount"
1852 elif kdu_values
.get("replicas") is not None:
1853 duplicate_check
= True
1854 replicas
= kdu_values
["replicas"]
1855 replica_str
= "replicas"
1859 "replicaCount or replicas not found in the resource"
1860 "{} values in model {}. Cannot be scaled".format(
1861 resource_name
, kdu_model
1866 "replicaCount or replicas not found in the values"
1867 "in model {}. Cannot be scaled".format(kdu_model
)
1870 raise K8sException(msg
)
1872 # Control if replicas and replicaCount exists at the same time
1873 msg
= "replicaCount and replicas are exists at the same time"
1875 if "replicaCount" in kdu_values
:
1877 raise K8sException(msg
)
1879 if "replicas" in kdu_values
:
1881 raise K8sException(msg
)
1883 return replicas
, replica_str
1885 async def _get_replica_count_instance(
1890 resource_name
: str = None,
1892 """Get the replica count value in the instance.
1895 kdu_instance: The name of the KDU instance
1896 namespace: KDU instance namespace
1898 resource_name: Resource name
1901 The number of replicas of the specific instance; if not found, returns None
1904 kdu_values
= yaml
.load(
1905 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1906 Loader
=yaml
.SafeLoader
,
1909 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1915 kdu_values
.get(resource_name
, None) if resource_name
else None
1918 for replica_str
in ("replicaCount", "replicas"):
1920 replicas
= resource_values
.get(replica_str
)
1922 replicas
= kdu_values
.get(replica_str
)
1924 if replicas
is not None:
1929 async def _store_status(
1934 namespace
: str = None,
1935 db_dict
: dict = None,
1938 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1940 :param cluster_id (str): the cluster where the KDU instance is deployed
1941 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1942 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1943 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1944 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1945 values for the keys:
1946 - "collection": The Mongo DB collection to write to
1947 - "filter": The query filter to use in the update process
1948 - "path": The dot separated keys which targets the object to be updated
1953 detailed_status
= await self
._status
_kdu
(
1954 cluster_id
=cluster_id
,
1955 kdu_instance
=kdu_instance
,
1957 namespace
=namespace
,
1960 status
= detailed_status
.get("info").get("description")
1961 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1963 # write status to db
1964 result
= await self
.write_app_status_to_db(
1967 detailed_status
=str(detailed_status
),
1968 operation
=operation
,
1972 self
.log
.info("Error writing in database. Task exiting...")
1974 except asyncio
.CancelledError
as e
:
1976 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1978 except Exception as e
:
1979 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1981 # params for use in -f file
1982 # returns values file option and filename (in order to delete it at the end)
1983 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> tuple[str, str]:
1984 if params
and len(params
) > 0:
1985 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1987 def get_random_number():
1988 r
= random
.SystemRandom().randint(1, 99999999)
1996 value
= params
.get(key
)
1997 if "!!yaml" in str(value
):
1998 value
= yaml
.safe_load(value
[7:])
1999 params2
[key
] = value
2001 values_file
= get_random_number() + ".yaml"
2002 with
open(values_file
, "w") as stream
:
2003 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
2005 return "-f {}".format(values_file
), values_file
2009 # params for use in --set option
2011 def _params_to_set_option(params
: dict) -> str:
2013 f
"{quote(str(key))}={quote(str(value))}"
2014 for key
, value
in params
.items()
2015 if value
is not None
2019 return "--set " + ",".join(pairs
)
2022 def generate_kdu_instance_name(**kwargs
):
2023 chart_name
= kwargs
["kdu_model"]
2024 # check embeded chart (file or dir)
2025 if chart_name
.startswith("/"):
2026 # extract file or directory name
2027 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2029 elif "://" in chart_name
:
2030 # extract last portion of URL
2031 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2034 for c
in chart_name
:
2035 if c
.isalpha() or c
.isnumeric():
2042 # if does not start with alpha character, prefix 'a'
2043 if not name
[0].isalpha():
2048 def get_random_number():
2049 r
= random
.SystemRandom().randint(1, 99999999)
2051 s
= s
.rjust(10, "0")
2054 name
= name
+ get_random_number()
2057 def _split_version(self
, kdu_model
: str) -> tuple[str, str]:
2061 self
._is
_helm
_chart
_a
_file
(kdu_model
)
2062 or self
._is
_helm
_chart
_a
_url
(kdu_model
)
2064 and ":" in kdu_model
2066 parts
= kdu_model
.split(sep
=":")
2068 version
= str(parts
[1])
2069 kdu_model
= parts
[0]
2070 return kdu_model
, version
2072 def _split_repo(self
, kdu_model
: str) -> tuple[str, str]:
2073 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2076 kdu_model (str): Associated KDU model
2079 (str, str): Tuple with the Chart name in index 0, and the repo name
2080 in index 2; if there was a problem finding them, return None
2087 idx
= kdu_model
.find("/")
2088 if not self
._is
_helm
_chart
_a
_url
(kdu_model
) and idx
>= 0:
2089 chart_name
= kdu_model
[idx
+ 1 :]
2090 repo_name
= kdu_model
[:idx
]
2092 return chart_name
, repo_name
2094 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2095 """Obtain the Helm repository for an Helm Chart
2098 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2099 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2102 str: the repository URL; if Helm Chart is a local one, the function returns None
2105 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2109 # Find repository link
2110 local_repo_list
= await self
.repo_list(cluster_uuid
)
2111 for repo
in local_repo_list
:
2112 if repo
["name"] == repo_name
:
2113 repo_url
= repo
["url"]
2114 break # it is not necessary to continue the loop if the repo link was found...
2118 def _repo_to_oci_url(self
, repo
):
2119 db_repo
= self
.db
.get_one("k8srepos", {"name": repo
}, fail_on_empty
=False)
2120 if db_repo
and "oci" in db_repo
:
2121 return db_repo
.get("url")
2123 async def _prepare_helm_chart(self
, kdu_model
, cluster_id
):
2124 # e.g.: "stable/openldap", "1.0"
2125 kdu_model
, version
= self
._split
_version
(kdu_model
)
2126 # e.g.: "openldap, stable"
2127 chart_name
, repo
= self
._split
_repo
(kdu_model
)
2128 if repo
and chart_name
: # repo/chart case
2129 oci_url
= self
._repo
_to
_oci
_url
(repo
)
2130 if oci_url
: # oci does not require helm repo update
2131 kdu_model
= f
"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2133 await self
.repo_update(cluster_id
, repo
)
2134 return kdu_model
, version
2136 async def create_certificate(
2137 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2139 paths
, env
= self
._init
_paths
_env
(
2140 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2142 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2143 await kubectl
.create_certificate(
2144 namespace
=namespace
,
2146 dns_prefix
=dns_prefix
,
2147 secret_name
=secret_name
,
2149 issuer_name
="ca-issuer",
2152 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2153 paths
, env
= self
._init
_paths
_env
(
2154 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2156 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2157 await kubectl
.delete_certificate(namespace
, certificate_name
)
2159 async def create_namespace(
2166 Create a namespace in a specific cluster
2168 :param namespace: Namespace to be created
2169 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2170 :param labels: Dictionary with labels for the new namespace
2173 paths
, env
= self
._init
_paths
_env
(
2174 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2176 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2177 await kubectl
.create_namespace(
2182 async def delete_namespace(
2188 Delete a namespace in a specific cluster
2190 :param namespace: namespace to be deleted
2191 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2194 paths
, env
= self
._init
_paths
_env
(
2195 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2197 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2198 await kubectl
.delete_namespace(
2202 async def copy_secret_data(
2208 src_namespace
: str = "osm",
2209 dst_namespace
: str = "osm",
2212 Copy a single key and value from an existing secret to a new one
2214 :param src_secret: name of the existing secret
2215 :param dst_secret: name of the new secret
2216 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2217 :param data_key: key of the existing secret to be copied
2218 :param src_namespace: Namespace of the existing secret
2219 :param dst_namespace: Namespace of the new secret
2222 paths
, env
= self
._init
_paths
_env
(
2223 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2225 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2226 secret_data
= await kubectl
.get_secret_content(
2228 namespace
=src_namespace
,
2230 # Only the corresponding data_key value needs to be copy
2231 data
= {data_key
: secret_data
.get(data_key
)}
2232 await kubectl
.create_secret(
2235 namespace
=dst_namespace
,
2236 secret_type
="Opaque",
2239 async def setup_default_rbac(
2250 Create a basic RBAC for a new namespace.
2252 :param name: name of both Role and Role Binding
2253 :param namespace: K8s namespace
2254 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2255 :param api_groups: Api groups to be allowed in Policy Rule
2256 :param resources: Resources to be allowed in Policy Rule
2257 :param verbs: Verbs to be allowed in Policy Rule
2258 :param service_account: Service Account name used to bind the Role
2261 paths
, env
= self
._init
_paths
_env
(
2262 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2264 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2265 await kubectl
.create_role(
2268 namespace
=namespace
,
2269 api_groups
=api_groups
,
2270 resources
=resources
,
2273 await kubectl
.create_role_binding(
2276 namespace
=namespace
,
2278 sa_name
=service_account
,