2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
25 from shlex
import quote
33 from uuid
import uuid4
34 from urllib
.parse
import urlparse
36 from n2vc
.config
import EnvironConfig
37 from n2vc
.exceptions
import K8sException
38 from n2vc
.k8s_conn
import K8sConnector
39 from n2vc
.kubectl
import Kubectl
42 class K8sHelmBaseConnector(K8sConnector
):
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
50 service_account
= "osm"
56 kubectl_command
: str = "/usr/bin/kubectl",
57 helm_command
: str = "/usr/bin/helm",
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
68 :param on_update_db: callback called when k8s connector updates database
72 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
74 self
.log
.info("Initializing K8S Helm connector")
76 self
.config
= EnvironConfig()
77 # random numbers for release name generation
78 random
.seed(time
.time())
83 # exception if kubectl is not installed
84 self
.kubectl_command
= kubectl_command
85 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
87 # exception if helm is not installed
88 self
._helm
_command
= helm_command
89 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
91 # exception if main post renderer executable is not present
92 self
.main_post_renderer_path
= EnvironConfig(prefixes
=["OSMLCM_"]).get(
93 "mainpostrendererpath"
95 if self
.main_post_renderer_path
:
96 self
._check
_file
_exists
(
97 filename
=self
.main_post_renderer_path
, exception_if_not_exists
=True
100 # exception if podLabels post renderer executable is not present
101 self
.podLabels_post_renderer_path
= EnvironConfig(prefixes
=["OSMLCM_"]).get(
102 "podlabelspostrendererpath"
104 if self
.podLabels_post_renderer_path
:
105 self
._check
_file
_exists
(
106 filename
=self
.podLabels_post_renderer_path
, exception_if_not_exists
=True
109 # obtain stable repo url from config or apply default
110 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
111 if self
._stable
_repo
_url
== "None":
112 self
._stable
_repo
_url
= None
114 # Lock to avoid concurrent execution of helm commands
115 self
.cmd_lock
= asyncio
.Lock()
117 def _get_namespace(self
, cluster_uuid
: str) -> str:
119 Obtains the namespace used by the cluster with the uuid passed by argument
121 param: cluster_uuid: cluster's uuid
124 # first, obtain the cluster corresponding to the uuid passed by argument
125 k8scluster
= self
.db
.get_one(
126 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
128 return k8scluster
.get("namespace")
133 namespace
: str = "kube-system",
134 reuse_cluster_uuid
=None,
136 ) -> tuple[str, bool]:
138 It prepares a given K8s cluster environment to run Charts
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
142 :param namespace: optional namespace to be used for helm. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :param kwargs: Additional parameters (None yet)
146 :return: uuid of the K8s cluster and True if connector has installed some
147 software in the cluster
148 (on error, an exception will be raised)
151 if reuse_cluster_uuid
:
152 cluster_id
= reuse_cluster_uuid
154 cluster_id
= str(uuid4())
157 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
160 paths
, env
= self
._init
_paths
_env
(
161 cluster_name
=cluster_id
, create_if_not_exist
=True
163 mode
= stat
.S_IRUSR | stat
.S_IWUSR
164 with
open(paths
["kube_config"], "w", mode
) as f
:
166 os
.chmod(paths
["kube_config"], 0o600)
168 # Code with initialization specific of helm version
169 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
171 # sync fs with local data
172 self
.fs
.reverse_sync(from_path
=cluster_id
)
174 self
.log
.info("Cluster {} initialized".format(cluster_id
))
176 return cluster_id
, n2vc_installed_sw
183 repo_type
: str = "chart",
186 password
: str = None,
190 "Cluster {}, adding {} repository {}. URL: {}".format(
191 cluster_uuid
, repo_type
, name
, url
196 paths
, env
= self
._init
_paths
_env
(
197 cluster_name
=cluster_uuid
, create_if_not_exist
=True
201 self
.fs
.sync(from_path
=cluster_uuid
)
204 if user
and password
:
205 host_port
= urlparse(url
).netloc
if url
.startswith("oci://") else url
206 # helm registry login url
207 command
= "env KUBECONFIG={} {} registry login {}".format(
208 paths
["kube_config"], self
._helm
_command
, quote(host_port
)
212 "OCI registry login is not needed for repo: {}".format(name
)
216 # helm repo add name url
217 command
= "env KUBECONFIG={} {} repo add {} {}".format(
218 paths
["kube_config"], self
._helm
_command
, quote(name
), quote(url
)
222 temp_cert_file
= os
.path
.join(
223 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
225 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
226 with
open(temp_cert_file
, "w") as the_cert
:
228 command
+= " --ca-file {}".format(quote(temp_cert_file
))
231 command
+= " --username={}".format(quote(user
))
234 command
+= " --password={}".format(quote(password
))
236 self
.log
.debug("adding repo: {}".format(command
))
237 await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=True, env
=env
243 command
= "env KUBECONFIG={} {} repo update {}".format(
244 paths
["kube_config"], self
._helm
_command
, quote(name
)
246 self
.log
.debug("updating repo: {}".format(command
))
247 await self
._local
_async
_exec
(
248 command
=command
, raise_exception_on_error
=False, env
=env
252 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
254 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
256 "Cluster {}, updating {} repository {}".format(
257 cluster_uuid
, repo_type
, name
262 paths
, env
= self
._init
_paths
_env
(
263 cluster_name
=cluster_uuid
, create_if_not_exist
=True
267 self
.fs
.sync(from_path
=cluster_uuid
)
270 command
= "{} repo update {}".format(self
._helm
_command
, quote(name
))
271 self
.log
.debug("updating repo: {}".format(command
))
272 await self
._local
_async
_exec
(
273 command
=command
, raise_exception_on_error
=False, env
=env
277 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
279 async def repo_list(self
, cluster_uuid
: str) -> list:
281 Get the list of registered repositories
283 :return: list of registered repositories: [ (name, url) .... ]
286 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
289 paths
, env
= self
._init
_paths
_env
(
290 cluster_name
=cluster_uuid
, create_if_not_exist
=True
294 self
.fs
.sync(from_path
=cluster_uuid
)
296 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
297 paths
["kube_config"], self
._helm
_command
300 # Set exception to false because if there are no repos just want an empty list
301 output
, _rc
= await self
._local
_async
_exec
(
302 command
=command
, raise_exception_on_error
=False, env
=env
306 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
309 if output
and len(output
) > 0:
310 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
311 # unify format between helm2 and helm3 setting all keys lowercase
312 return self
._lower
_keys
_list
(repos
)
318 async def repo_remove(self
, cluster_uuid
: str, name
: str):
320 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
324 paths
, env
= self
._init
_paths
_env
(
325 cluster_name
=cluster_uuid
, create_if_not_exist
=True
329 self
.fs
.sync(from_path
=cluster_uuid
)
331 command
= "env KUBECONFIG={} {} repo remove {}".format(
332 paths
["kube_config"], self
._helm
_command
, quote(name
)
334 await self
._local
_async
_exec
(
335 command
=command
, raise_exception_on_error
=True, env
=env
339 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
345 uninstall_sw
: bool = False,
350 Resets the Kubernetes cluster by removing the helm deployment that represents it.
352 :param cluster_uuid: The UUID of the cluster to reset
353 :param force: Boolean to force the reset
354 :param uninstall_sw: Boolean to force the reset
355 :param kwargs: Additional parameters (None yet)
356 :return: Returns True if successful or raises an exception.
358 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
360 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
361 cluster_uuid
, uninstall_sw
366 self
.fs
.sync(from_path
=cluster_uuid
)
368 # uninstall releases if needed.
370 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
371 if len(releases
) > 0:
375 kdu_instance
= r
.get("name")
376 chart
= r
.get("chart")
378 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
380 await self
.uninstall(
381 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
383 except Exception as e
:
384 # will not raise exception as it was found
385 # that in some cases of previously installed helm releases it
388 "Error uninstalling release {}: {}".format(
394 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
395 ).format(cluster_uuid
)
398 False # Allow to remove k8s cluster without removing Tiller
402 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
404 # delete cluster directory
405 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
406 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
407 # Remove also local directorio if still exist
408 direct
= self
.fs
.path
+ "/" + cluster_uuid
409 shutil
.rmtree(direct
, ignore_errors
=True)
413 def _is_helm_chart_a_file(self
, chart_name
: str):
414 return chart_name
.count("/") > 1
417 def _is_helm_chart_a_url(chart_name
: str):
418 result
= urlparse(chart_name
)
419 return all([result
.scheme
, result
.netloc
])
421 async def _install_impl(
429 timeout
: float = 300,
431 db_dict
: dict = None,
433 kdu_name
: str = None,
434 namespace
: str = None,
437 paths
, env
= self
._init
_paths
_env
(
438 cluster_name
=cluster_id
, create_if_not_exist
=True
442 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
443 cluster_id
=cluster_id
, params
=params
446 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_id
)
448 command
= self
._get
_install
_command
(
457 paths
["kube_config"],
460 self
.log
.debug("installing: {}".format(command
))
463 # exec helm in a task
464 exec_task
= asyncio
.ensure_future(
465 coro_or_future
=self
._local
_async
_exec
(
466 command
=command
, raise_exception_on_error
=False, env
=env
470 # write status in another task
471 status_task
= asyncio
.ensure_future(
472 coro_or_future
=self
._store
_status
(
473 cluster_id
=cluster_id
,
474 kdu_instance
=kdu_instance
,
481 # wait for execution task
482 await asyncio
.wait([exec_task
])
487 output
, rc
= exec_task
.result()
490 output
, rc
= await self
._local
_async
_exec
(
491 command
=command
, raise_exception_on_error
=False, env
=env
494 # remove temporal values yaml file
496 os
.remove(file_to_delete
)
499 await self
._store
_status
(
500 cluster_id
=cluster_id
,
501 kdu_instance
=kdu_instance
,
508 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
510 raise K8sException(msg
)
516 kdu_model
: str = None,
518 timeout
: float = 300,
520 db_dict
: dict = None,
521 namespace
: str = None,
522 reset_values
: bool = False,
523 reuse_values
: bool = True,
524 reset_then_reuse_values
: bool = False,
527 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
530 self
.fs
.sync(from_path
=cluster_uuid
)
532 # look for instance to obtain namespace
536 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
537 if not instance_info
:
538 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
539 namespace
= instance_info
["namespace"]
542 paths
, env
= self
._init
_paths
_env
(
543 cluster_name
=cluster_uuid
, create_if_not_exist
=True
547 self
.fs
.sync(from_path
=cluster_uuid
)
550 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
551 cluster_id
=cluster_uuid
, params
=params
554 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
557 if db_dict
and await self
._contains
_labels
(
558 kdu_instance
, namespace
, paths
["kube_config"], env
560 labels_dict
= await self
._labels
_dict
(db_dict
, kdu_instance
)
562 command
= self
._get
_upgrade
_command
(
571 paths
["kube_config"],
574 reset_then_reuse_values
,
578 self
.log
.debug("upgrading: {}".format(command
))
581 # exec helm in a task
582 exec_task
= asyncio
.ensure_future(
583 coro_or_future
=self
._local
_async
_exec
(
584 command
=command
, raise_exception_on_error
=False, env
=env
587 # write status in another task
588 status_task
= asyncio
.ensure_future(
589 coro_or_future
=self
._store
_status
(
590 cluster_id
=cluster_uuid
,
591 kdu_instance
=kdu_instance
,
598 # wait for execution task
599 await asyncio
.wait([exec_task
])
603 output
, rc
= exec_task
.result()
606 output
, rc
= await self
._local
_async
_exec
(
607 command
=command
, raise_exception_on_error
=False, env
=env
610 # remove temporal values yaml file
612 os
.remove(file_to_delete
)
615 await self
._store
_status
(
616 cluster_id
=cluster_uuid
,
617 kdu_instance
=kdu_instance
,
624 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
626 raise K8sException(msg
)
629 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
631 # return new revision number
632 instance
= await self
.get_instance_info(
633 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
636 revision
= int(instance
.get("revision"))
637 self
.log
.debug("New revision: {}".format(revision
))
647 total_timeout
: float = 1800,
648 cluster_uuid
: str = None,
649 kdu_model
: str = None,
651 db_dict
: dict = None,
654 """Scale a resource in a Helm Chart.
657 kdu_instance: KDU instance name
658 scale: Scale to which to set the resource
659 resource_name: Resource name
660 total_timeout: The time, in seconds, to wait
661 cluster_uuid: The UUID of the cluster
662 kdu_model: The chart reference
663 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
664 The --wait flag will be set automatically if --atomic is used
665 db_dict: Dictionary for any additional data
666 kwargs: Additional parameters
669 True if successful, False otherwise
672 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
674 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
675 resource_name
, kdu_model
, cluster_uuid
678 self
.log
.debug(debug_mgs
)
680 # look for instance to obtain namespace
681 # get_instance_info function calls the sync command
682 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
683 if not instance_info
:
684 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
687 paths
, env
= self
._init
_paths
_env
(
688 cluster_name
=cluster_uuid
, create_if_not_exist
=True
692 kdu_model
, version
= await self
._prepare
_helm
_chart
(kdu_model
, cluster_uuid
)
694 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
696 _
, replica_str
= await self
._get
_replica
_count
_url
(
697 kdu_model
, repo_url
, resource_name
701 if db_dict
and await self
._contains
_labels
(
702 kdu_instance
, instance_info
["namespace"], paths
["kube_config"], env
704 labels_dict
= await self
._labels
_dict
(db_dict
, kdu_instance
)
706 command
= self
._get
_upgrade
_scale
_command
(
709 instance_info
["namespace"],
717 paths
["kube_config"],
720 self
.log
.debug("scaling: {}".format(command
))
723 # exec helm in a task
724 exec_task
= asyncio
.ensure_future(
725 coro_or_future
=self
._local
_async
_exec
(
726 command
=command
, raise_exception_on_error
=False, env
=env
729 # write status in another task
730 status_task
= asyncio
.ensure_future(
731 coro_or_future
=self
._store
_status
(
732 cluster_id
=cluster_uuid
,
733 kdu_instance
=kdu_instance
,
734 namespace
=instance_info
["namespace"],
740 # wait for execution task
741 await asyncio
.wait([exec_task
])
745 output
, rc
= exec_task
.result()
748 output
, rc
= await self
._local
_async
_exec
(
749 command
=command
, raise_exception_on_error
=False, env
=env
753 await self
._store
_status
(
754 cluster_id
=cluster_uuid
,
755 kdu_instance
=kdu_instance
,
756 namespace
=instance_info
["namespace"],
762 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
764 raise K8sException(msg
)
767 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
771 async def get_scale_count(
779 """Get a resource scale count.
782 cluster_uuid: The UUID of the cluster
783 resource_name: Resource name
784 kdu_instance: KDU instance name
785 kdu_model: The name or path of an Helm Chart
786 kwargs: Additional parameters
789 Resource instance count
793 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
796 # look for instance to obtain namespace
797 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
798 if not instance_info
:
799 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
802 paths
, _
= self
._init
_paths
_env
(
803 cluster_name
=cluster_uuid
, create_if_not_exist
=True
806 replicas
= await self
._get
_replica
_count
_instance
(
807 kdu_instance
=kdu_instance
,
808 namespace
=instance_info
["namespace"],
809 kubeconfig
=paths
["kube_config"],
810 resource_name
=resource_name
,
814 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
817 # Get default value if scale count is not found from provided values
818 # Important note: this piece of code shall only be executed in the first scaling operation,
819 # since it is expected that the _get_replica_count_instance is able to obtain the number of
820 # replicas when a scale operation was already conducted previously for this KDU/resource!
822 repo_url
= await self
._find
_repo
(
823 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
825 replicas
, _
= await self
._get
_replica
_count
_url
(
826 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
830 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
831 f
"{resource_name} obtained: {replicas}"
835 msg
= "Replica count not found. Cannot be scaled"
837 raise K8sException(msg
)
842 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
845 "rollback kdu_instance {} to revision {} from cluster {}".format(
846 kdu_instance
, revision
, cluster_uuid
851 self
.fs
.sync(from_path
=cluster_uuid
)
853 # look for instance to obtain namespace
854 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
855 if not instance_info
:
856 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
859 paths
, env
= self
._init
_paths
_env
(
860 cluster_name
=cluster_uuid
, create_if_not_exist
=True
864 self
.fs
.sync(from_path
=cluster_uuid
)
866 command
= self
._get
_rollback
_command
(
867 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
870 self
.log
.debug("rolling_back: {}".format(command
))
872 # exec helm in a task
873 exec_task
= asyncio
.ensure_future(
874 coro_or_future
=self
._local
_async
_exec
(
875 command
=command
, raise_exception_on_error
=False, env
=env
878 # write status in another task
879 status_task
= asyncio
.ensure_future(
880 coro_or_future
=self
._store
_status
(
881 cluster_id
=cluster_uuid
,
882 kdu_instance
=kdu_instance
,
883 namespace
=instance_info
["namespace"],
885 operation
="rollback",
889 # wait for execution task
890 await asyncio
.wait([exec_task
])
895 output
, rc
= exec_task
.result()
898 await self
._store
_status
(
899 cluster_id
=cluster_uuid
,
900 kdu_instance
=kdu_instance
,
901 namespace
=instance_info
["namespace"],
903 operation
="rollback",
907 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
909 raise K8sException(msg
)
912 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
914 # return new revision number
915 instance
= await self
.get_instance_info(
916 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
919 revision
= int(instance
.get("revision"))
920 self
.log
.debug("New revision: {}".format(revision
))
925 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
927 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
928 (this call should happen after all _terminate-config-primitive_ of the VNF
931 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
932 :param kdu_instance: unique name for the KDU instance to be deleted
933 :param kwargs: Additional parameters (None yet)
934 :return: True if successful
938 "uninstall kdu_instance {} from cluster {}".format(
939 kdu_instance
, cluster_uuid
944 self
.fs
.sync(from_path
=cluster_uuid
)
946 # look for instance to obtain namespace
947 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
948 if not instance_info
:
949 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
952 paths
, env
= self
._init
_paths
_env
(
953 cluster_name
=cluster_uuid
, create_if_not_exist
=True
957 self
.fs
.sync(from_path
=cluster_uuid
)
959 command
= self
._get
_uninstall
_command
(
960 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
962 output
, _rc
= await self
._local
_async
_exec
(
963 command
=command
, raise_exception_on_error
=True, env
=env
967 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
969 return self
._output
_to
_table
(output
)
971 async def instances_list(self
, cluster_uuid
: str) -> list:
973 returns a list of deployed releases in a cluster
975 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
979 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
982 self
.fs
.sync(from_path
=cluster_uuid
)
984 # execute internal command
985 result
= await self
._instances
_list
(cluster_uuid
)
988 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
992 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
993 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
994 for instance
in instances
:
995 if instance
.get("name") == kdu_instance
:
997 self
.log
.debug("Instance {} not found".format(kdu_instance
))
1000 async def upgrade_charm(
1004 charm_id
: str = None,
1005 charm_type
: str = None,
1006 timeout
: float = None,
1008 """This method upgrade charms in VNFs
1011 ee_id: Execution environment id
1012 path: Local path to the charm
1014 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
1015 timeout: (Float) Timeout for the ns update operation
1018 The output of the update operation if status equals to "completed"
1020 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
1022 async def exec_primitive(
1024 cluster_uuid
: str = None,
1025 kdu_instance
: str = None,
1026 primitive_name
: str = None,
1027 timeout
: float = 300,
1028 params
: dict = None,
1029 db_dict
: dict = None,
1032 """Exec primitive (Juju action)
1034 :param cluster_uuid: The UUID of the cluster or namespace:cluster
1035 :param kdu_instance: The unique name of the KDU instance
1036 :param primitive_name: Name of action that will be executed
1037 :param timeout: Timeout for action execution
1038 :param params: Dictionary of all the parameters needed for the action
1039 :db_dict: Dictionary for any additional data
1040 :param kwargs: Additional parameters (None yet)
1042 :return: Returns the output of the action
1045 "KDUs deployed with Helm don't support actions "
1046 "different from rollback, upgrade and status"
1049 async def get_services(
1050 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1053 Returns a list of services defined for the specified kdu instance.
1055 :param cluster_uuid: UUID of a K8s cluster known by OSM
1056 :param kdu_instance: unique name for the KDU instance
1057 :param namespace: K8s namespace used by the KDU instance
1058 :return: If successful, it will return a list of services, Each service
1059 can have the following data:
1060 - `name` of the service
1061 - `type` type of service in the k8 cluster
1062 - `ports` List of ports offered by the service, for each port includes at least
1063 name, port, protocol
1064 - `cluster_ip` Internal ip to be used inside k8s cluster
1065 - `external_ip` List of external ips (in case they are available)
1069 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1070 cluster_uuid
, kdu_instance
1075 paths
, env
= self
._init
_paths
_env
(
1076 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1080 self
.fs
.sync(from_path
=cluster_uuid
)
1082 # get list of services names for kdu
1083 service_names
= await self
._get
_services
(
1084 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1088 for service
in service_names
:
1089 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1090 service_list
.append(service
)
1093 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1097 async def get_service(
1098 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1101 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1102 service_name
, namespace
, cluster_uuid
1107 self
.fs
.sync(from_path
=cluster_uuid
)
1109 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1112 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1116 async def status_kdu(
1117 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1118 ) -> Union
[str, dict]:
1120 This call would retrieve tha current state of a given KDU instance. It would be
1121 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1122 values_ of the configuration parameters applied to a given instance. This call
1123 would be based on the `status` call.
1125 :param cluster_uuid: UUID of a K8s cluster known by OSM
1126 :param kdu_instance: unique name for the KDU instance
1127 :param kwargs: Additional parameters (None yet)
1128 :param yaml_format: if the return shall be returned as an YAML string or as a
1130 :return: If successful, it will return the following vector of arguments:
1131 - K8s `namespace` in the cluster where the KDU lives
1132 - `state` of the KDU instance. It can be:
1139 - List of `resources` (objects) that this release consists of, sorted by kind,
1140 and the status of those resources
1141 - Last `deployment_time`.
1145 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1146 cluster_uuid
, kdu_instance
1151 self
.fs
.sync(from_path
=cluster_uuid
)
1153 # get instance: needed to obtain namespace
1154 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1155 for instance
in instances
:
1156 if instance
.get("name") == kdu_instance
:
1159 # instance does not exist
1161 "Instance name: {} not found in cluster: {}".format(
1162 kdu_instance
, cluster_uuid
1166 status
= await self
._status
_kdu
(
1167 cluster_id
=cluster_uuid
,
1168 kdu_instance
=kdu_instance
,
1169 namespace
=instance
["namespace"],
1170 yaml_format
=yaml_format
,
1171 show_error_log
=True,
1175 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1179 async def get_values_kdu(
1180 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1182 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1184 return await self
._exec
_get
_command
(
1185 get_command
="values",
1186 kdu_instance
=kdu_instance
,
1187 namespace
=namespace
,
1188 kubeconfig
=kubeconfig
,
1191 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1192 """Method to obtain the Helm Chart package's values
1195 kdu_model: The name or path of an Helm Chart
1196 repo_url: Helm Chart repository url
1199 str: the values of the Helm Chart package
1203 "inspect kdu_model values {} from (optional) repo: {}".format(
1208 return await self
._exec
_inspect
_command
(
1209 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1212 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1214 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1217 return await self
._exec
_inspect
_command
(
1218 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1221 async def synchronize_repos(self
, cluster_uuid
: str):
1222 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1224 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1225 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1227 local_repo_list
= await self
.repo_list(cluster_uuid
)
1228 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1230 deleted_repo_list
= []
1231 added_repo_dict
= {}
1233 # iterate over the list of repos in the database that should be
1234 # added if not present
1235 for repo_name
, db_repo
in db_repo_dict
.items():
1237 # check if it is already present
1238 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1239 repo_id
= db_repo
.get("_id")
1240 if curr_repo_url
!= db_repo
["url"]:
1243 "repo {} url changed, delete and and again".format(
1247 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1248 deleted_repo_list
.append(repo_id
)
1251 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1252 await self
.repo_add(
1256 cert
=db_repo
.get("ca_cert"),
1257 user
=db_repo
.get("user"),
1258 password
=db_repo
.get("password"),
1259 oci
=db_repo
.get("oci", False),
1261 added_repo_dict
[repo_id
] = db_repo
["name"]
1262 except Exception as e
:
1264 "Error adding repo id: {}, err_msg: {} ".format(
1269 # Delete repos that are present but not in nbi_list
1270 for repo_name
in local_repo_dict
:
1271 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1272 self
.log
.debug("delete repo {}".format(repo_name
))
1274 await self
.repo_remove(cluster_uuid
, repo_name
)
1275 deleted_repo_list
.append(repo_name
)
1276 except Exception as e
:
1278 "Error deleting repo, name: {}, err_msg: {}".format(
1283 return deleted_repo_list
, added_repo_dict
1285 except K8sException
:
1287 except Exception as e
:
1288 # Do not raise errors synchronizing repos
1289 self
.log
.error("Error synchronizing repos: {}".format(e
))
1290 raise Exception("Error synchronizing repos: {}".format(e
))
1292 def _get_db_repos_dict(self
, repo_ids
: list):
1294 for repo_id
in repo_ids
:
1295 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1296 db_repos_dict
[db_repo
["name"]] = db_repo
1297 return db_repos_dict
1300 ####################################################################################
1301 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1302 ####################################################################################
1306 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1308 Creates and returns base cluster and kube dirs and returns them.
1309 Also created helm3 dirs according to new directory specification, paths are
1310 not returned but assigned to helm environment variables
1312 :param cluster_name: cluster_name
1313 :return: Dictionary with config_paths and dictionary with helm environment variables
1317 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1319 Implements the helm version dependent cluster initialization
1323 async def _instances_list(self
, cluster_id
):
1325 Implements the helm version dependent helm instances list
1329 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1331 Implements the helm version dependent method to obtain services from a helm instance
1335 async def _status_kdu(
1339 namespace
: str = None,
1340 yaml_format
: bool = False,
1341 show_error_log
: bool = False,
1342 ) -> Union
[str, dict]:
1344 Implements the helm version dependent method to obtain status of a helm instance
1348 def _get_install_command(
1361 Obtain command to be executed to delete the indicated instance
1365 def _get_upgrade_scale_command(
1379 """Generates the command to scale a Helm Chart release
1382 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1383 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1384 namespace (str): Namespace where this KDU instance is deployed
1385 scale (int): Scale count
1386 version (str): Constraint with specific version of the Chart to use
1387 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1388 The --wait flag will be set automatically if --atomic is used
1389 replica_str (str): The key under resource_name key where the scale count is stored
1390 timeout (float): The time, in seconds, to wait
1391 resource_name (str): The KDU's resource to scale
1392 kubeconfig (str): Kubeconfig file path
1395 str: command to scale a Helm Chart release
1399 def _get_upgrade_command(
1412 reset_then_reuse_values
,
1415 """Generates the command to upgrade a Helm Chart release
1418 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1419 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1420 namespace (str): Namespace where this KDU instance is deployed
1421 params_str (str): Params used to upgrade the Helm Chart release
1422 version (str): Constraint with specific version of the Chart to use
1423 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1424 The --wait flag will be set automatically if --atomic is used
1425 timeout (float): The time, in seconds, to wait
1426 kubeconfig (str): Kubeconfig file path
1427 reset_values(bool): If set, helm resets values instead of reusing previous values.
1428 reuse_values(bool): If set, helm reuses previous values.
1429 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
1430 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1432 str: command to upgrade a Helm Chart release
1436 def _get_rollback_command(
1437 self
, kdu_instance
, namespace
, revision
, kubeconfig
1440 Obtain command to be executed to rollback the indicated instance
1444 def _get_uninstall_command(
1445 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1448 Obtain command to be executed to delete the indicated instance
1452 def _get_inspect_command(
1453 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1455 """Generates the command to obtain the information about an Helm Chart package
1456 (´helm show ...´ command)
1459 show_command: the second part of the command (`helm show <show_command>`)
1460 kdu_model: The name or path of an Helm Chart
1461 repo_url: Helm Chart repository url
1462 version: constraint with specific version of the Chart to use
1465 str: the generated Helm Chart command
1469 def _get_get_command(
1470 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1472 """Obtain command to be executed to get information about the kdu instance."""
1475 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1477 Method call to uninstall cluster software for helm. This method is dependent
1479 For Helm v2 it will be called when Tiller must be uninstalled
1480 For Helm v3 it does nothing and does not need to be callled
1484 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1486 Obtains the cluster repos identifiers
1490 ####################################################################################
1491 ################################### P R I V A T E ##################################
1492 ####################################################################################
1496 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1497 if os
.path
.exists(filename
):
1500 msg
= "File {} does not exist".format(filename
)
1501 if exception_if_not_exists
:
1502 raise K8sException(msg
)
1505 def _remove_multiple_spaces(strobj
):
1506 strobj
= strobj
.strip()
1507 while " " in strobj
:
1508 strobj
= strobj
.replace(" ", " ")
1512 def _output_to_lines(output
: str) -> list:
1513 output_lines
= list()
1514 lines
= output
.splitlines(keepends
=False)
1518 output_lines
.append(line
)
1522 def _output_to_table(output
: str) -> list:
1523 output_table
= list()
1524 lines
= output
.splitlines(keepends
=False)
1526 line
= line
.replace("\t", " ")
1528 output_table
.append(line_list
)
1529 cells
= line
.split(sep
=" ")
1533 line_list
.append(cell
)
1537 def _parse_services(output
: str) -> list:
1538 lines
= output
.splitlines(keepends
=False)
1541 line
= line
.replace("\t", " ")
1542 cells
= line
.split(sep
=" ")
1543 if len(cells
) > 0 and cells
[0].startswith("service/"):
1544 elems
= cells
[0].split(sep
="/")
1546 services
.append(elems
[1])
1550 def _get_deep(dictionary
: dict, members
: tuple):
1555 value
= target
.get(m
)
1564 # find key:value in several lines
1566 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1567 for line
in p_lines
:
1569 if line
.startswith(p_key
+ ":"):
1570 parts
= line
.split(":")
1571 the_value
= parts
[1].strip()
1579 def _lower_keys_list(input_list
: list):
1581 Transform the keys in a list of dictionaries to lower case and returns a new list
1586 for dictionary
in input_list
:
1587 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1588 new_list
.append(new_dict
)
1591 async def _local_async_exec(
1594 raise_exception_on_error
: bool = False,
1595 show_error_log
: bool = True,
1596 encode_utf8
: bool = False,
1598 ) -> tuple[str, int]:
1599 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1601 "Executing async local command: {}, env: {}".format(command
, env
)
1605 command
= shlex
.split(command
)
1607 environ
= os
.environ
.copy()
1612 async with self
.cmd_lock
:
1613 process
= await asyncio
.create_subprocess_exec(
1615 stdout
=asyncio
.subprocess
.PIPE
,
1616 stderr
=asyncio
.subprocess
.PIPE
,
1620 # wait for command terminate
1621 stdout
, stderr
= await process
.communicate()
1623 return_code
= process
.returncode
1627 output
= stdout
.decode("utf-8").strip()
1628 # output = stdout.decode()
1630 output
= stderr
.decode("utf-8").strip()
1631 # output = stderr.decode()
1633 if return_code
!= 0 and show_error_log
:
1635 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1638 self
.log
.debug("Return code: {}".format(return_code
))
1640 if raise_exception_on_error
and return_code
!= 0:
1641 raise K8sException(output
)
1644 output
= output
.encode("utf-8").strip()
1645 output
= str(output
).replace("\\n", "\n")
1647 return output
, return_code
1649 except asyncio
.CancelledError
:
1650 # first, kill the process if it is still running
1651 if process
.returncode
is None:
1654 except K8sException
:
1656 except Exception as e
:
1657 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1659 if raise_exception_on_error
:
1660 raise K8sException(e
) from e
1664 async def _local_async_exec_pipe(
1668 raise_exception_on_error
: bool = True,
1669 show_error_log
: bool = True,
1670 encode_utf8
: bool = False,
1673 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1674 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1675 command
= "{} | {}".format(command1
, command2
)
1677 "Executing async local command: {}, env: {}".format(command
, env
)
1681 command1
= shlex
.split(command1
)
1682 command2
= shlex
.split(command2
)
1684 environ
= os
.environ
.copy()
1689 async with self
.cmd_lock
:
1690 read
, write
= os
.pipe()
1691 process_1
= await asyncio
.create_subprocess_exec(
1692 *command1
, stdout
=write
, env
=environ
1695 process_2
= await asyncio
.create_subprocess_exec(
1696 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1699 stdout
, stderr
= await process_2
.communicate()
1701 return_code
= process_2
.returncode
1705 output
= stdout
.decode("utf-8").strip()
1706 # output = stdout.decode()
1708 output
= stderr
.decode("utf-8").strip()
1709 # output = stderr.decode()
1711 if return_code
!= 0 and show_error_log
:
1713 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1716 self
.log
.debug("Return code: {}".format(return_code
))
1718 if raise_exception_on_error
and return_code
!= 0:
1719 raise K8sException(output
)
1722 output
= output
.encode("utf-8").strip()
1723 output
= str(output
).replace("\\n", "\n")
1725 return output
, return_code
1726 except asyncio
.CancelledError
:
1727 # first, kill the processes if they are still running
1728 for process
in (process_1
, process_2
):
1729 if process
.returncode
is None:
1732 except K8sException
:
1734 except Exception as e
:
1735 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1737 if raise_exception_on_error
:
1738 raise K8sException(e
) from e
1742 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1744 Obtains the data of the specified service in the k8cluster.
1746 :param cluster_id: id of a K8s cluster known by OSM
1747 :param service_name: name of the K8s service in the specified namespace
1748 :param namespace: K8s namespace used by the KDU instance
1749 :return: If successful, it will return a service with the following data:
1750 - `name` of the service
1751 - `type` type of service in the k8 cluster
1752 - `ports` List of ports offered by the service, for each port includes at least
1753 name, port, protocol
1754 - `cluster_ip` Internal ip to be used inside k8s cluster
1755 - `external_ip` List of external ips (in case they are available)
1759 paths
, env
= self
._init
_paths
_env
(
1760 cluster_name
=cluster_id
, create_if_not_exist
=True
1763 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1764 self
.kubectl_command
,
1765 paths
["kube_config"],
1767 quote(service_name
),
1770 output
, _rc
= await self
._local
_async
_exec
(
1771 command
=command
, raise_exception_on_error
=True, env
=env
1774 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1777 "name": service_name
,
1778 "type": self
._get
_deep
(data
, ("spec", "type")),
1779 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1780 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1782 if service
["type"] == "LoadBalancer":
1783 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1784 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1785 service
["external_ip"] = ip_list
1789 async def _exec_get_command(
1790 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1792 """Obtains information about the kdu instance."""
1794 full_command
= self
._get
_get
_command
(
1795 get_command
, kdu_instance
, namespace
, kubeconfig
1798 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1802 async def _exec_inspect_command(
1803 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1805 """Obtains information about an Helm Chart package (´helm show´ command)
1808 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1809 kdu_model: The name or path of an Helm Chart
1810 repo_url: Helm Chart repository url
1813 str: the requested info about the Helm Chart package
1818 repo_str
= " --repo {}".format(quote(repo_url
))
1820 # Obtain the Chart's name and store it in the var kdu_model
1821 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1823 kdu_model
, version
= self
._split
_version
(kdu_model
)
1825 version_str
= "--version {}".format(quote(version
))
1829 full_command
= self
._get
_inspect
_command
(
1830 show_command
=inspect_command
,
1831 kdu_model
=quote(kdu_model
),
1833 version
=version_str
,
1836 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1840 async def _get_replica_count_url(
1843 repo_url
: str = None,
1844 resource_name
: str = None,
1845 ) -> tuple[int, str]:
1846 """Get the replica count value in the Helm Chart Values.
1849 kdu_model: The name or path of an Helm Chart
1850 repo_url: Helm Chart repository url
1851 resource_name: Resource name
1855 - The number of replicas of the specific instance; if not found, returns None; and
1856 - The string corresponding to the replica count key in the Helm values
1859 kdu_values
= yaml
.load(
1860 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1861 Loader
=yaml
.SafeLoader
,
1864 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1868 "kdu_values not found for kdu_model {}".format(kdu_model
)
1872 kdu_values
= kdu_values
.get(resource_name
, None)
1875 msg
= "resource {} not found in the values in model {}".format(
1876 resource_name
, kdu_model
1879 raise K8sException(msg
)
1881 duplicate_check
= False
1886 if kdu_values
.get("replicaCount") is not None:
1887 replicas
= kdu_values
["replicaCount"]
1888 replica_str
= "replicaCount"
1889 elif kdu_values
.get("replicas") is not None:
1890 duplicate_check
= True
1891 replicas
= kdu_values
["replicas"]
1892 replica_str
= "replicas"
1896 "replicaCount or replicas not found in the resource"
1897 "{} values in model {}. Cannot be scaled".format(
1898 resource_name
, kdu_model
1903 "replicaCount or replicas not found in the values"
1904 "in model {}. Cannot be scaled".format(kdu_model
)
1907 raise K8sException(msg
)
1909 # Control if replicas and replicaCount exists at the same time
1910 msg
= "replicaCount and replicas are exists at the same time"
1912 if "replicaCount" in kdu_values
:
1914 raise K8sException(msg
)
1916 if "replicas" in kdu_values
:
1918 raise K8sException(msg
)
1920 return replicas
, replica_str
1922 async def _get_replica_count_instance(
1927 resource_name
: str = None,
1929 """Get the replica count value in the instance.
1932 kdu_instance: The name of the KDU instance
1933 namespace: KDU instance namespace
1935 resource_name: Resource name
1938 The number of replicas of the specific instance; if not found, returns None
1941 kdu_values
= yaml
.load(
1942 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1943 Loader
=yaml
.SafeLoader
,
1946 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1952 kdu_values
.get(resource_name
, None) if resource_name
else None
1955 for replica_str
in ("replicaCount", "replicas"):
1957 replicas
= resource_values
.get(replica_str
)
1959 replicas
= kdu_values
.get(replica_str
)
1961 if replicas
is not None:
1966 async def _labels_dict(self
, db_dict
, kdu_instance
):
1967 # get the network service registry
1968 ns_id
= db_dict
["filter"]["_id"]
1970 db_nsr
= self
.db
.get_one("nsrs", {"_id": ns_id
})
1971 except Exception as e
:
1972 print("nsr {} not found: {}".format(ns_id
, e
))
1973 nsd_id
= db_nsr
["nsd"]["_id"]
1975 # get the kdu registry
1976 for index
, kdu
in enumerate(db_nsr
["_admin"]["deployed"]["K8s"]):
1977 if kdu
["kdu-instance"] == kdu_instance
:
1981 # No kdur found, could be the case of an EE chart
1984 kdu_name
= db_kdur
["kdu-name"]
1985 member_vnf_index
= db_kdur
["member-vnf-index"]
1986 # get the vnf registry
1988 db_vnfr
= self
.db
.get_one(
1990 {"nsr-id-ref": ns_id
, "member-vnf-index-ref": member_vnf_index
},
1992 except Exception as e
:
1993 print("vnfr {} not found: {}".format(member_vnf_index
, e
))
1995 vnf_id
= db_vnfr
["_id"]
1996 vnfd_id
= db_vnfr
["vnfd-id"]
1999 "managed-by": "osm.etsi.org",
2000 "osm.etsi.org/ns-id": ns_id
,
2001 "osm.etsi.org/nsd-id": nsd_id
,
2002 "osm.etsi.org/vnf-id": vnf_id
,
2003 "osm.etsi.org/vnfd-id": vnfd_id
,
2004 "osm.etsi.org/kdu-id": kdu_instance
,
2005 "osm.etsi.org/kdu-name": kdu_name
,
2008 async def _contains_labels(self
, kdu_instance
, namespace
, kube_config
, env
):
2009 command
= "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
2012 quote(kdu_instance
),
2015 output
, rc
= await self
._local
_async
_exec
(
2016 command
=command
, raise_exception_on_error
=False, env
=env
2018 manifests
= yaml
.safe_load_all(output
)
2019 for manifest
in manifests
:
2020 # Check if the manifest has metadata and labels
2022 manifest
is not None
2023 and "metadata" in manifest
2024 and "labels" in manifest
["metadata"]
2028 "osm.etsi.org/kdu-id",
2029 "osm.etsi.org/kdu-name",
2030 "osm.etsi.org/ns-id",
2031 "osm.etsi.org/nsd-id",
2032 "osm.etsi.org/vnf-id",
2033 "osm.etsi.org/vnfd-id",
2035 if labels
.issubset(manifest
["metadata"]["labels"].keys()):
2039 async def _store_status(
2044 namespace
: str = None,
2045 db_dict
: dict = None,
2048 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
2050 :param cluster_id (str): the cluster where the KDU instance is deployed
2051 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
2052 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
2053 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
2054 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
2055 values for the keys:
2056 - "collection": The Mongo DB collection to write to
2057 - "filter": The query filter to use in the update process
2058 - "path": The dot separated keys which targets the object to be updated
2063 detailed_status
= await self
._status
_kdu
(
2064 cluster_id
=cluster_id
,
2065 kdu_instance
=kdu_instance
,
2067 namespace
=namespace
,
2070 status
= detailed_status
.get("info").get("description")
2071 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
2073 # write status to db
2074 result
= await self
.write_app_status_to_db(
2077 detailed_status
=str(detailed_status
),
2078 operation
=operation
,
2082 self
.log
.info("Error writing in database. Task exiting...")
2084 except asyncio
.CancelledError
as e
:
2086 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
2088 except Exception as e
:
2089 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
2091 # params for use in -f file
2092 # returns values file option and filename (in order to delete it at the end)
2093 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> tuple[str, str]:
2094 if params
and len(params
) > 0:
2095 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
2097 def get_random_number():
2098 r
= random
.SystemRandom().randint(1, 99999999)
2106 value
= params
.get(key
)
2107 if "!!yaml" in str(value
):
2108 value
= yaml
.safe_load(value
[7:])
2109 params2
[key
] = value
2111 values_file
= get_random_number() + ".yaml"
2112 with
open(values_file
, "w") as stream
:
2113 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
2115 return "-f {}".format(values_file
), values_file
2119 # params for use in --set option
2121 def _params_to_set_option(params
: dict) -> str:
2123 f
"{quote(str(key))}={quote(str(value))}"
2124 for key
, value
in params
.items()
2125 if value
is not None
2129 return "--set " + ",".join(pairs
)
2132 def generate_kdu_instance_name(**kwargs
):
2133 chart_name
= kwargs
["kdu_model"]
2134 # check embeded chart (file or dir)
2135 if chart_name
.startswith("/"):
2136 # extract file or directory name
2137 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2139 elif "://" in chart_name
:
2140 # extract last portion of URL
2141 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2144 for c
in chart_name
:
2145 if c
.isalpha() or c
.isnumeric():
2152 # if does not start with alpha character, prefix 'a'
2153 if not name
[0].isalpha():
2158 def get_random_number():
2159 r
= random
.SystemRandom().randint(1, 99999999)
2161 s
= s
.rjust(10, "0")
2164 name
= name
+ get_random_number()
2167 def _split_version(self
, kdu_model
: str) -> tuple[str, str]:
2171 self
._is
_helm
_chart
_a
_file
(kdu_model
)
2172 or self
._is
_helm
_chart
_a
_url
(kdu_model
)
2174 and ":" in kdu_model
2176 parts
= kdu_model
.split(sep
=":")
2178 version
= str(parts
[1])
2179 kdu_model
= parts
[0]
2180 return kdu_model
, version
2182 def _split_repo(self
, kdu_model
: str) -> tuple[str, str]:
2183 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2186 kdu_model (str): Associated KDU model
2189 (str, str): Tuple with the Chart name in index 0, and the repo name
2190 in index 2; if there was a problem finding them, return None
2197 idx
= kdu_model
.find("/")
2198 if not self
._is
_helm
_chart
_a
_url
(kdu_model
) and idx
>= 0:
2199 chart_name
= kdu_model
[idx
+ 1 :]
2200 repo_name
= kdu_model
[:idx
]
2202 return chart_name
, repo_name
2204 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2205 """Obtain the Helm repository for an Helm Chart
2208 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2209 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2212 str: the repository URL; if Helm Chart is a local one, the function returns None
2215 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2219 # Find repository link
2220 local_repo_list
= await self
.repo_list(cluster_uuid
)
2221 for repo
in local_repo_list
:
2222 if repo
["name"] == repo_name
:
2223 repo_url
= repo
["url"]
2224 break # it is not necessary to continue the loop if the repo link was found...
2228 def _repo_to_oci_url(self
, repo
):
2229 db_repo
= self
.db
.get_one("k8srepos", {"name": repo
}, fail_on_empty
=False)
2230 if db_repo
and "oci" in db_repo
:
2231 return db_repo
.get("url")
2233 async def _prepare_helm_chart(self
, kdu_model
, cluster_id
):
2234 # e.g.: "stable/openldap", "1.0"
2235 kdu_model
, version
= self
._split
_version
(kdu_model
)
2236 # e.g.: "openldap, stable"
2237 chart_name
, repo
= self
._split
_repo
(kdu_model
)
2238 if repo
and chart_name
: # repo/chart case
2239 oci_url
= self
._repo
_to
_oci
_url
(repo
)
2240 if oci_url
: # oci does not require helm repo update
2241 kdu_model
= f
"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2243 await self
.repo_update(cluster_id
, repo
)
2244 return kdu_model
, version
2246 async def create_certificate(
2247 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2249 paths
, env
= self
._init
_paths
_env
(
2250 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2252 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2253 await kubectl
.create_certificate(
2254 namespace
=namespace
,
2256 dns_prefix
=dns_prefix
,
2257 secret_name
=secret_name
,
2259 issuer_name
="ca-issuer",
2262 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2263 paths
, env
= self
._init
_paths
_env
(
2264 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2266 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2267 await kubectl
.delete_certificate(namespace
, certificate_name
)
2269 async def create_namespace(
2276 Create a namespace in a specific cluster
2278 :param namespace: Namespace to be created
2279 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2280 :param labels: Dictionary with labels for the new namespace
2283 paths
, env
= self
._init
_paths
_env
(
2284 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2286 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2287 await kubectl
.create_namespace(
2292 async def delete_namespace(
2298 Delete a namespace in a specific cluster
2300 :param namespace: namespace to be deleted
2301 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2304 paths
, env
= self
._init
_paths
_env
(
2305 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2307 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2308 await kubectl
.delete_namespace(
2312 async def copy_secret_data(
2318 src_namespace
: str = "osm",
2319 dst_namespace
: str = "osm",
2322 Copy a single key and value from an existing secret to a new one
2324 :param src_secret: name of the existing secret
2325 :param dst_secret: name of the new secret
2326 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2327 :param data_key: key of the existing secret to be copied
2328 :param src_namespace: Namespace of the existing secret
2329 :param dst_namespace: Namespace of the new secret
2332 paths
, env
= self
._init
_paths
_env
(
2333 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2335 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2336 secret_data
= await kubectl
.get_secret_content(
2338 namespace
=src_namespace
,
2340 # Only the corresponding data_key value needs to be copy
2341 data
= {data_key
: secret_data
.get(data_key
)}
2342 await kubectl
.create_secret(
2345 namespace
=dst_namespace
,
2346 secret_type
="Opaque",
2349 async def setup_default_rbac(
2360 Create a basic RBAC for a new namespace.
2362 :param name: name of both Role and Role Binding
2363 :param namespace: K8s namespace
2364 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2365 :param api_groups: Api groups to be allowed in Policy Rule
2366 :param resources: Resources to be allowed in Policy Rule
2367 :param verbs: Verbs to be allowed in Policy Rule
2368 :param service_account: Service Account name used to bind the Role
2371 paths
, env
= self
._init
_paths
_env
(
2372 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2374 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2375 await kubectl
.create_role(
2378 namespace
=namespace
,
2379 api_groups
=api_groups
,
2380 resources
=resources
,
2383 await kubectl
.create_role_binding(
2386 namespace
=namespace
,
2388 sa_name
=service_account
,