2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
25 from shlex
import quote
33 from uuid
import uuid4
35 from n2vc
.config
import EnvironConfig
36 from n2vc
.exceptions
import K8sException
37 from n2vc
.k8s_conn
import K8sConnector
38 from n2vc
.kubectl
import Kubectl
41 class K8sHelmBaseConnector(K8sConnector
):
44 ####################################################################################
45 ################################### P U B L I C ####################################
46 ####################################################################################
49 service_account
= "osm"
55 kubectl_command
: str = "/usr/bin/kubectl",
56 helm_command
: str = "/usr/bin/helm",
62 :param fs: file system for kubernetes and helm configuration
63 :param db: database object to write current operation status
64 :param kubectl_command: path to kubectl executable
65 :param helm_command: path to helm executable
67 :param on_update_db: callback called when k8s connector updates database
71 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
73 self
.log
.info("Initializing K8S Helm connector")
75 self
.config
= EnvironConfig()
76 # random numbers for release name generation
77 random
.seed(time
.time())
82 # exception if kubectl is not installed
83 self
.kubectl_command
= kubectl_command
84 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
86 # exception if helm is not installed
87 self
._helm
_command
= helm_command
88 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
90 # obtain stable repo url from config or apply default
91 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
92 if self
._stable
_repo
_url
== "None":
93 self
._stable
_repo
_url
= None
95 # Lock to avoid concurrent execution of helm commands
96 self
.cmd_lock
= asyncio
.Lock()
98 def _get_namespace(self
, cluster_uuid
: str) -> str:
100 Obtains the namespace used by the cluster with the uuid passed by argument
102 param: cluster_uuid: cluster's uuid
105 # first, obtain the cluster corresponding to the uuid passed by argument
106 k8scluster
= self
.db
.get_one(
107 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
109 return k8scluster
.get("namespace")
114 namespace
: str = "kube-system",
115 reuse_cluster_uuid
=None,
117 ) -> tuple[str, bool]:
119 It prepares a given K8s cluster environment to run Charts
121 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
123 :param namespace: optional namespace to be used for helm. By default,
124 'kube-system' will be used
125 :param reuse_cluster_uuid: existing cluster uuid for reuse
126 :param kwargs: Additional parameters (None yet)
127 :return: uuid of the K8s cluster and True if connector has installed some
128 software in the cluster
129 (on error, an exception will be raised)
132 if reuse_cluster_uuid
:
133 cluster_id
= reuse_cluster_uuid
135 cluster_id
= str(uuid4())
138 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
141 paths
, env
= self
._init
_paths
_env
(
142 cluster_name
=cluster_id
, create_if_not_exist
=True
144 mode
= stat
.S_IRUSR | stat
.S_IWUSR
145 with
open(paths
["kube_config"], "w", mode
) as f
:
147 os
.chmod(paths
["kube_config"], 0o600)
149 # Code with initialization specific of helm version
150 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
152 # sync fs with local data
153 self
.fs
.reverse_sync(from_path
=cluster_id
)
155 self
.log
.info("Cluster {} initialized".format(cluster_id
))
157 return cluster_id
, n2vc_installed_sw
164 repo_type
: str = "chart",
167 password
: str = None,
170 "Cluster {}, adding {} repository {}. URL: {}".format(
171 cluster_uuid
, repo_type
, name
, url
176 paths
, env
= self
._init
_paths
_env
(
177 cluster_name
=cluster_uuid
, create_if_not_exist
=True
181 self
.fs
.sync(from_path
=cluster_uuid
)
183 # helm repo add name url
184 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
185 paths
["kube_config"], self
._helm
_command
, quote(name
), quote(url
)
189 temp_cert_file
= os
.path
.join(
190 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
192 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
193 with
open(temp_cert_file
, "w") as the_cert
:
195 command
+= " --ca-file {}".format(quote(temp_cert_file
))
198 command
+= " --username={}".format(quote(user
))
201 command
+= " --password={}".format(quote(password
))
203 self
.log
.debug("adding repo: {}".format(command
))
204 await self
._local
_async
_exec
(
205 command
=command
, raise_exception_on_error
=True, env
=env
209 command
= "env KUBECONFIG={} {} repo update {}".format(
210 paths
["kube_config"], self
._helm
_command
, quote(name
)
212 self
.log
.debug("updating repo: {}".format(command
))
213 await self
._local
_async
_exec
(
214 command
=command
, raise_exception_on_error
=False, env
=env
218 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
220 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
222 "Cluster {}, updating {} repository {}".format(
223 cluster_uuid
, repo_type
, name
228 paths
, env
= self
._init
_paths
_env
(
229 cluster_name
=cluster_uuid
, create_if_not_exist
=True
233 self
.fs
.sync(from_path
=cluster_uuid
)
236 command
= "{} repo update {}".format(self
._helm
_command
, quote(name
))
237 self
.log
.debug("updating repo: {}".format(command
))
238 await self
._local
_async
_exec
(
239 command
=command
, raise_exception_on_error
=False, env
=env
243 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
245 async def repo_list(self
, cluster_uuid
: str) -> list:
247 Get the list of registered repositories
249 :return: list of registered repositories: [ (name, url) .... ]
252 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
255 paths
, env
= self
._init
_paths
_env
(
256 cluster_name
=cluster_uuid
, create_if_not_exist
=True
260 self
.fs
.sync(from_path
=cluster_uuid
)
262 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
263 paths
["kube_config"], self
._helm
_command
266 # Set exception to false because if there are no repos just want an empty list
267 output
, _rc
= await self
._local
_async
_exec
(
268 command
=command
, raise_exception_on_error
=False, env
=env
272 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
275 if output
and len(output
) > 0:
276 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
277 # unify format between helm2 and helm3 setting all keys lowercase
278 return self
._lower
_keys
_list
(repos
)
284 async def repo_remove(self
, cluster_uuid
: str, name
: str):
286 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
290 paths
, env
= self
._init
_paths
_env
(
291 cluster_name
=cluster_uuid
, create_if_not_exist
=True
295 self
.fs
.sync(from_path
=cluster_uuid
)
297 command
= "env KUBECONFIG={} {} repo remove {}".format(
298 paths
["kube_config"], self
._helm
_command
, quote(name
)
300 await self
._local
_async
_exec
(
301 command
=command
, raise_exception_on_error
=True, env
=env
305 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
311 uninstall_sw
: bool = False,
316 Resets the Kubernetes cluster by removing the helm deployment that represents it.
318 :param cluster_uuid: The UUID of the cluster to reset
319 :param force: Boolean to force the reset
320 :param uninstall_sw: Boolean to force the reset
321 :param kwargs: Additional parameters (None yet)
322 :return: Returns True if successful or raises an exception.
324 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
326 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
327 cluster_uuid
, uninstall_sw
332 self
.fs
.sync(from_path
=cluster_uuid
)
334 # uninstall releases if needed.
336 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
337 if len(releases
) > 0:
341 kdu_instance
= r
.get("name")
342 chart
= r
.get("chart")
344 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
346 await self
.uninstall(
347 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
349 except Exception as e
:
350 # will not raise exception as it was found
351 # that in some cases of previously installed helm releases it
354 "Error uninstalling release {}: {}".format(
360 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
361 ).format(cluster_uuid
)
364 False # Allow to remove k8s cluster without removing Tiller
368 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
370 # delete cluster directory
371 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
372 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
373 # Remove also local directorio if still exist
374 direct
= self
.fs
.path
+ "/" + cluster_uuid
375 shutil
.rmtree(direct
, ignore_errors
=True)
379 def _is_helm_chart_a_file(self
, chart_name
: str):
380 return chart_name
.count("/") > 1
382 async def _install_impl(
390 timeout
: float = 300,
392 db_dict
: dict = None,
393 kdu_name
: str = None,
394 namespace
: str = None,
397 paths
, env
= self
._init
_paths
_env
(
398 cluster_name
=cluster_id
, create_if_not_exist
=True
402 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
403 cluster_id
=cluster_id
, params
=params
407 kdu_model
, version
= self
._split
_version
(kdu_model
)
409 _
, repo
= self
._split
_repo
(kdu_model
)
411 await self
.repo_update(cluster_id
, repo
)
413 command
= self
._get
_install
_command
(
421 paths
["kube_config"],
424 self
.log
.debug("installing: {}".format(command
))
427 # exec helm in a task
428 exec_task
= asyncio
.ensure_future(
429 coro_or_future
=self
._local
_async
_exec
(
430 command
=command
, raise_exception_on_error
=False, env
=env
434 # write status in another task
435 status_task
= asyncio
.ensure_future(
436 coro_or_future
=self
._store
_status
(
437 cluster_id
=cluster_id
,
438 kdu_instance
=kdu_instance
,
445 # wait for execution task
446 await asyncio
.wait([exec_task
])
451 output
, rc
= exec_task
.result()
454 output
, rc
= await self
._local
_async
_exec
(
455 command
=command
, raise_exception_on_error
=False, env
=env
458 # remove temporal values yaml file
460 os
.remove(file_to_delete
)
463 await self
._store
_status
(
464 cluster_id
=cluster_id
,
465 kdu_instance
=kdu_instance
,
472 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
474 raise K8sException(msg
)
480 kdu_model
: str = None,
482 timeout
: float = 300,
484 db_dict
: dict = None,
485 namespace
: str = None,
488 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
491 self
.fs
.sync(from_path
=cluster_uuid
)
493 # look for instance to obtain namespace
497 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
498 if not instance_info
:
499 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
500 namespace
= instance_info
["namespace"]
503 paths
, env
= self
._init
_paths
_env
(
504 cluster_name
=cluster_uuid
, create_if_not_exist
=True
508 self
.fs
.sync(from_path
=cluster_uuid
)
511 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
512 cluster_id
=cluster_uuid
, params
=params
516 kdu_model
, version
= self
._split
_version
(kdu_model
)
518 _
, repo
= self
._split
_repo
(kdu_model
)
520 await self
.repo_update(cluster_uuid
, repo
)
522 command
= self
._get
_upgrade
_command
(
530 paths
["kube_config"],
534 self
.log
.debug("upgrading: {}".format(command
))
537 # exec helm in a task
538 exec_task
= asyncio
.ensure_future(
539 coro_or_future
=self
._local
_async
_exec
(
540 command
=command
, raise_exception_on_error
=False, env
=env
543 # write status in another task
544 status_task
= asyncio
.ensure_future(
545 coro_or_future
=self
._store
_status
(
546 cluster_id
=cluster_uuid
,
547 kdu_instance
=kdu_instance
,
554 # wait for execution task
555 await asyncio
.wait([exec_task
])
559 output
, rc
= exec_task
.result()
562 output
, rc
= await self
._local
_async
_exec
(
563 command
=command
, raise_exception_on_error
=False, env
=env
566 # remove temporal values yaml file
568 os
.remove(file_to_delete
)
571 await self
._store
_status
(
572 cluster_id
=cluster_uuid
,
573 kdu_instance
=kdu_instance
,
580 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
582 raise K8sException(msg
)
585 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
587 # return new revision number
588 instance
= await self
.get_instance_info(
589 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
592 revision
= int(instance
.get("revision"))
593 self
.log
.debug("New revision: {}".format(revision
))
603 total_timeout
: float = 1800,
604 cluster_uuid
: str = None,
605 kdu_model
: str = None,
607 db_dict
: dict = None,
610 """Scale a resource in a Helm Chart.
613 kdu_instance: KDU instance name
614 scale: Scale to which to set the resource
615 resource_name: Resource name
616 total_timeout: The time, in seconds, to wait
617 cluster_uuid: The UUID of the cluster
618 kdu_model: The chart reference
619 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
620 The --wait flag will be set automatically if --atomic is used
621 db_dict: Dictionary for any additional data
622 kwargs: Additional parameters
625 True if successful, False otherwise
628 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
630 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
631 resource_name
, kdu_model
, cluster_uuid
634 self
.log
.debug(debug_mgs
)
636 # look for instance to obtain namespace
637 # get_instance_info function calls the sync command
638 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
639 if not instance_info
:
640 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
643 paths
, env
= self
._init
_paths
_env
(
644 cluster_name
=cluster_uuid
, create_if_not_exist
=True
648 kdu_model
, version
= self
._split
_version
(kdu_model
)
650 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
652 _
, replica_str
= await self
._get
_replica
_count
_url
(
653 kdu_model
, repo_url
, resource_name
656 command
= self
._get
_upgrade
_scale
_command
(
659 instance_info
["namespace"],
666 paths
["kube_config"],
669 self
.log
.debug("scaling: {}".format(command
))
672 # exec helm in a task
673 exec_task
= asyncio
.ensure_future(
674 coro_or_future
=self
._local
_async
_exec
(
675 command
=command
, raise_exception_on_error
=False, env
=env
678 # write status in another task
679 status_task
= asyncio
.ensure_future(
680 coro_or_future
=self
._store
_status
(
681 cluster_id
=cluster_uuid
,
682 kdu_instance
=kdu_instance
,
683 namespace
=instance_info
["namespace"],
689 # wait for execution task
690 await asyncio
.wait([exec_task
])
694 output
, rc
= exec_task
.result()
697 output
, rc
= await self
._local
_async
_exec
(
698 command
=command
, raise_exception_on_error
=False, env
=env
702 await self
._store
_status
(
703 cluster_id
=cluster_uuid
,
704 kdu_instance
=kdu_instance
,
705 namespace
=instance_info
["namespace"],
711 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
713 raise K8sException(msg
)
716 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
720 async def get_scale_count(
728 """Get a resource scale count.
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
734 kdu_model: The name or path of an Helm Chart
735 kwargs: Additional parameters
738 Resource instance count
742 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
745 # look for instance to obtain namespace
746 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
747 if not instance_info
:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
751 paths
, _
= self
._init
_paths
_env
(
752 cluster_name
=cluster_uuid
, create_if_not_exist
=True
755 replicas
= await self
._get
_replica
_count
_instance
(
756 kdu_instance
=kdu_instance
,
757 namespace
=instance_info
["namespace"],
758 kubeconfig
=paths
["kube_config"],
759 resource_name
=resource_name
,
763 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
766 # Get default value if scale count is not found from provided values
767 # Important note: this piece of code shall only be executed in the first scaling operation,
768 # since it is expected that the _get_replica_count_instance is able to obtain the number of
769 # replicas when a scale operation was already conducted previously for this KDU/resource!
771 repo_url
= await self
._find
_repo
(
772 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
774 replicas
, _
= await self
._get
_replica
_count
_url
(
775 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
779 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
780 f
"{resource_name} obtained: {replicas}"
784 msg
= "Replica count not found. Cannot be scaled"
786 raise K8sException(msg
)
791 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
794 "rollback kdu_instance {} to revision {} from cluster {}".format(
795 kdu_instance
, revision
, cluster_uuid
800 self
.fs
.sync(from_path
=cluster_uuid
)
802 # look for instance to obtain namespace
803 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
804 if not instance_info
:
805 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
808 paths
, env
= self
._init
_paths
_env
(
809 cluster_name
=cluster_uuid
, create_if_not_exist
=True
813 self
.fs
.sync(from_path
=cluster_uuid
)
815 command
= self
._get
_rollback
_command
(
816 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
819 self
.log
.debug("rolling_back: {}".format(command
))
821 # exec helm in a task
822 exec_task
= asyncio
.ensure_future(
823 coro_or_future
=self
._local
_async
_exec
(
824 command
=command
, raise_exception_on_error
=False, env
=env
827 # write status in another task
828 status_task
= asyncio
.ensure_future(
829 coro_or_future
=self
._store
_status
(
830 cluster_id
=cluster_uuid
,
831 kdu_instance
=kdu_instance
,
832 namespace
=instance_info
["namespace"],
834 operation
="rollback",
838 # wait for execution task
839 await asyncio
.wait([exec_task
])
844 output
, rc
= exec_task
.result()
847 await self
._store
_status
(
848 cluster_id
=cluster_uuid
,
849 kdu_instance
=kdu_instance
,
850 namespace
=instance_info
["namespace"],
852 operation
="rollback",
856 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
858 raise K8sException(msg
)
861 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
863 # return new revision number
864 instance
= await self
.get_instance_info(
865 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
868 revision
= int(instance
.get("revision"))
869 self
.log
.debug("New revision: {}".format(revision
))
874 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
876 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
877 (this call should happen after all _terminate-config-primitive_ of the VNF
880 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
881 :param kdu_instance: unique name for the KDU instance to be deleted
882 :param kwargs: Additional parameters (None yet)
883 :return: True if successful
887 "uninstall kdu_instance {} from cluster {}".format(
888 kdu_instance
, cluster_uuid
893 self
.fs
.sync(from_path
=cluster_uuid
)
895 # look for instance to obtain namespace
896 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
897 if not instance_info
:
898 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
901 paths
, env
= self
._init
_paths
_env
(
902 cluster_name
=cluster_uuid
, create_if_not_exist
=True
906 self
.fs
.sync(from_path
=cluster_uuid
)
908 command
= self
._get
_uninstall
_command
(
909 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
911 output
, _rc
= await self
._local
_async
_exec
(
912 command
=command
, raise_exception_on_error
=True, env
=env
916 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
918 return self
._output
_to
_table
(output
)
920 async def instances_list(self
, cluster_uuid
: str) -> list:
922 returns a list of deployed releases in a cluster
924 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
928 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
931 self
.fs
.sync(from_path
=cluster_uuid
)
933 # execute internal command
934 result
= await self
._instances
_list
(cluster_uuid
)
937 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
941 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
942 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
943 for instance
in instances
:
944 if instance
.get("name") == kdu_instance
:
946 self
.log
.debug("Instance {} not found".format(kdu_instance
))
949 async def upgrade_charm(
953 charm_id
: str = None,
954 charm_type
: str = None,
955 timeout
: float = None,
957 """This method upgrade charms in VNFs
960 ee_id: Execution environment id
961 path: Local path to the charm
963 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
964 timeout: (Float) Timeout for the ns update operation
967 The output of the update operation if status equals to "completed"
969 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
971 async def exec_primitive(
973 cluster_uuid
: str = None,
974 kdu_instance
: str = None,
975 primitive_name
: str = None,
976 timeout
: float = 300,
978 db_dict
: dict = None,
981 """Exec primitive (Juju action)
983 :param cluster_uuid: The UUID of the cluster or namespace:cluster
984 :param kdu_instance: The unique name of the KDU instance
985 :param primitive_name: Name of action that will be executed
986 :param timeout: Timeout for action execution
987 :param params: Dictionary of all the parameters needed for the action
988 :db_dict: Dictionary for any additional data
989 :param kwargs: Additional parameters (None yet)
991 :return: Returns the output of the action
994 "KDUs deployed with Helm don't support actions "
995 "different from rollback, upgrade and status"
998 async def get_services(
999 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1002 Returns a list of services defined for the specified kdu instance.
1004 :param cluster_uuid: UUID of a K8s cluster known by OSM
1005 :param kdu_instance: unique name for the KDU instance
1006 :param namespace: K8s namespace used by the KDU instance
1007 :return: If successful, it will return a list of services, Each service
1008 can have the following data:
1009 - `name` of the service
1010 - `type` type of service in the k8 cluster
1011 - `ports` List of ports offered by the service, for each port includes at least
1012 name, port, protocol
1013 - `cluster_ip` Internal ip to be used inside k8s cluster
1014 - `external_ip` List of external ips (in case they are available)
1018 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1019 cluster_uuid
, kdu_instance
1024 paths
, env
= self
._init
_paths
_env
(
1025 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1029 self
.fs
.sync(from_path
=cluster_uuid
)
1031 # get list of services names for kdu
1032 service_names
= await self
._get
_services
(
1033 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1037 for service
in service_names
:
1038 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1039 service_list
.append(service
)
1042 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1046 async def get_service(
1047 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1050 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1051 service_name
, namespace
, cluster_uuid
1056 self
.fs
.sync(from_path
=cluster_uuid
)
1058 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1061 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1065 async def status_kdu(
1066 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1067 ) -> Union
[str, dict]:
1069 This call would retrieve tha current state of a given KDU instance. It would be
1070 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1071 values_ of the configuration parameters applied to a given instance. This call
1072 would be based on the `status` call.
1074 :param cluster_uuid: UUID of a K8s cluster known by OSM
1075 :param kdu_instance: unique name for the KDU instance
1076 :param kwargs: Additional parameters (None yet)
1077 :param yaml_format: if the return shall be returned as an YAML string or as a
1079 :return: If successful, it will return the following vector of arguments:
1080 - K8s `namespace` in the cluster where the KDU lives
1081 - `state` of the KDU instance. It can be:
1088 - List of `resources` (objects) that this release consists of, sorted by kind,
1089 and the status of those resources
1090 - Last `deployment_time`.
1094 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1095 cluster_uuid
, kdu_instance
1100 self
.fs
.sync(from_path
=cluster_uuid
)
1102 # get instance: needed to obtain namespace
1103 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1104 for instance
in instances
:
1105 if instance
.get("name") == kdu_instance
:
1108 # instance does not exist
1110 "Instance name: {} not found in cluster: {}".format(
1111 kdu_instance
, cluster_uuid
1115 status
= await self
._status
_kdu
(
1116 cluster_id
=cluster_uuid
,
1117 kdu_instance
=kdu_instance
,
1118 namespace
=instance
["namespace"],
1119 yaml_format
=yaml_format
,
1120 show_error_log
=True,
1124 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1128 async def get_values_kdu(
1129 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1131 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1133 return await self
._exec
_get
_command
(
1134 get_command
="values",
1135 kdu_instance
=kdu_instance
,
1136 namespace
=namespace
,
1137 kubeconfig
=kubeconfig
,
1140 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1141 """Method to obtain the Helm Chart package's values
1144 kdu_model: The name or path of an Helm Chart
1145 repo_url: Helm Chart repository url
1148 str: the values of the Helm Chart package
1152 "inspect kdu_model values {} from (optional) repo: {}".format(
1157 return await self
._exec
_inspect
_command
(
1158 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1161 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1163 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1166 return await self
._exec
_inspect
_command
(
1167 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1170 async def synchronize_repos(self
, cluster_uuid
: str):
1171 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1173 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1174 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1176 local_repo_list
= await self
.repo_list(cluster_uuid
)
1177 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1179 deleted_repo_list
= []
1180 added_repo_dict
= {}
1182 # iterate over the list of repos in the database that should be
1183 # added if not present
1184 for repo_name
, db_repo
in db_repo_dict
.items():
1186 # check if it is already present
1187 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1188 repo_id
= db_repo
.get("_id")
1189 if curr_repo_url
!= db_repo
["url"]:
1192 "repo {} url changed, delete and and again".format(
1196 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1197 deleted_repo_list
.append(repo_id
)
1200 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1201 if "ca_cert" in db_repo
:
1202 await self
.repo_add(
1206 cert
=db_repo
["ca_cert"],
1209 await self
.repo_add(
1214 added_repo_dict
[repo_id
] = db_repo
["name"]
1215 except Exception as e
:
1217 "Error adding repo id: {}, err_msg: {} ".format(
1222 # Delete repos that are present but not in nbi_list
1223 for repo_name
in local_repo_dict
:
1224 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1225 self
.log
.debug("delete repo {}".format(repo_name
))
1227 await self
.repo_remove(cluster_uuid
, repo_name
)
1228 deleted_repo_list
.append(repo_name
)
1229 except Exception as e
:
1231 "Error deleting repo, name: {}, err_msg: {}".format(
1236 return deleted_repo_list
, added_repo_dict
1238 except K8sException
:
1240 except Exception as e
:
1241 # Do not raise errors synchronizing repos
1242 self
.log
.error("Error synchronizing repos: {}".format(e
))
1243 raise Exception("Error synchronizing repos: {}".format(e
))
1245 def _get_db_repos_dict(self
, repo_ids
: list):
1247 for repo_id
in repo_ids
:
1248 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1249 db_repos_dict
[db_repo
["name"]] = db_repo
1250 return db_repos_dict
1253 ####################################################################################
1254 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1255 ####################################################################################
1259 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1261 Creates and returns base cluster and kube dirs and returns them.
1262 Also created helm3 dirs according to new directory specification, paths are
1263 not returned but assigned to helm environment variables
1265 :param cluster_name: cluster_name
1266 :return: Dictionary with config_paths and dictionary with helm environment variables
1270 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1272 Implements the helm version dependent cluster initialization
1276 async def _instances_list(self
, cluster_id
):
1278 Implements the helm version dependent helm instances list
1282 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1284 Implements the helm version dependent method to obtain services from a helm instance
1288 async def _status_kdu(
1292 namespace
: str = None,
1293 yaml_format
: bool = False,
1294 show_error_log
: bool = False,
1295 ) -> Union
[str, dict]:
1297 Implements the helm version dependent method to obtain status of a helm instance
1301 def _get_install_command(
1313 Obtain command to be executed to delete the indicated instance
1317 def _get_upgrade_scale_command(
1330 """Generates the command to scale a Helm Chart release
1333 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1334 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1335 namespace (str): Namespace where this KDU instance is deployed
1336 scale (int): Scale count
1337 version (str): Constraint with specific version of the Chart to use
1338 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1339 The --wait flag will be set automatically if --atomic is used
1340 replica_str (str): The key under resource_name key where the scale count is stored
1341 timeout (float): The time, in seconds, to wait
1342 resource_name (str): The KDU's resource to scale
1343 kubeconfig (str): Kubeconfig file path
1346 str: command to scale a Helm Chart release
1350 def _get_upgrade_command(
1362 """Generates the command to upgrade a Helm Chart release
1365 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1366 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1367 namespace (str): Namespace where this KDU instance is deployed
1368 params_str (str): Params used to upgrade the Helm Chart release
1369 version (str): Constraint with specific version of the Chart to use
1370 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1371 The --wait flag will be set automatically if --atomic is used
1372 timeout (float): The time, in seconds, to wait
1373 kubeconfig (str): Kubeconfig file path
1374 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1376 str: command to upgrade a Helm Chart release
1380 def _get_rollback_command(
1381 self
, kdu_instance
, namespace
, revision
, kubeconfig
1384 Obtain command to be executed to rollback the indicated instance
1388 def _get_uninstall_command(
1389 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1392 Obtain command to be executed to delete the indicated instance
1396 def _get_inspect_command(
1397 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1399 """Generates the command to obtain the information about an Helm Chart package
1400 (´helm show ...´ command)
1403 show_command: the second part of the command (`helm show <show_command>`)
1404 kdu_model: The name or path of an Helm Chart
1405 repo_url: Helm Chart repository url
1406 version: constraint with specific version of the Chart to use
1409 str: the generated Helm Chart command
1413 def _get_get_command(
1414 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1416 """Obtain command to be executed to get information about the kdu instance."""
1419 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1421 Method call to uninstall cluster software for helm. This method is dependent
1423 For Helm v2 it will be called when Tiller must be uninstalled
1424 For Helm v3 it does nothing and does not need to be callled
1428 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1430 Obtains the cluster repos identifiers
1434 ####################################################################################
1435 ################################### P R I V A T E ##################################
1436 ####################################################################################
1440 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1441 if os
.path
.exists(filename
):
1444 msg
= "File {} does not exist".format(filename
)
1445 if exception_if_not_exists
:
1446 raise K8sException(msg
)
1449 def _remove_multiple_spaces(strobj
):
1450 strobj
= strobj
.strip()
1451 while " " in strobj
:
1452 strobj
= strobj
.replace(" ", " ")
1456 def _output_to_lines(output
: str) -> list:
1457 output_lines
= list()
1458 lines
= output
.splitlines(keepends
=False)
1462 output_lines
.append(line
)
1466 def _output_to_table(output
: str) -> list:
1467 output_table
= list()
1468 lines
= output
.splitlines(keepends
=False)
1470 line
= line
.replace("\t", " ")
1472 output_table
.append(line_list
)
1473 cells
= line
.split(sep
=" ")
1477 line_list
.append(cell
)
1481 def _parse_services(output
: str) -> list:
1482 lines
= output
.splitlines(keepends
=False)
1485 line
= line
.replace("\t", " ")
1486 cells
= line
.split(sep
=" ")
1487 if len(cells
) > 0 and cells
[0].startswith("service/"):
1488 elems
= cells
[0].split(sep
="/")
1490 services
.append(elems
[1])
1494 def _get_deep(dictionary
: dict, members
: tuple):
1499 value
= target
.get(m
)
1508 # find key:value in several lines
1510 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1511 for line
in p_lines
:
1513 if line
.startswith(p_key
+ ":"):
1514 parts
= line
.split(":")
1515 the_value
= parts
[1].strip()
1523 def _lower_keys_list(input_list
: list):
1525 Transform the keys in a list of dictionaries to lower case and returns a new list
1530 for dictionary
in input_list
:
1531 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1532 new_list
.append(new_dict
)
1535 async def _local_async_exec(
1538 raise_exception_on_error
: bool = False,
1539 show_error_log
: bool = True,
1540 encode_utf8
: bool = False,
1542 ) -> tuple[str, int]:
1543 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1545 "Executing async local command: {}, env: {}".format(command
, env
)
1549 command
= shlex
.split(command
)
1551 environ
= os
.environ
.copy()
1556 async with self
.cmd_lock
:
1557 process
= await asyncio
.create_subprocess_exec(
1559 stdout
=asyncio
.subprocess
.PIPE
,
1560 stderr
=asyncio
.subprocess
.PIPE
,
1564 # wait for command terminate
1565 stdout
, stderr
= await process
.communicate()
1567 return_code
= process
.returncode
1571 output
= stdout
.decode("utf-8").strip()
1572 # output = stdout.decode()
1574 output
= stderr
.decode("utf-8").strip()
1575 # output = stderr.decode()
1577 if return_code
!= 0 and show_error_log
:
1579 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1582 self
.log
.debug("Return code: {}".format(return_code
))
1584 if raise_exception_on_error
and return_code
!= 0:
1585 raise K8sException(output
)
1588 output
= output
.encode("utf-8").strip()
1589 output
= str(output
).replace("\\n", "\n")
1591 return output
, return_code
1593 except asyncio
.CancelledError
:
1594 # first, kill the process if it is still running
1595 if process
.returncode
is None:
1598 except K8sException
:
1600 except Exception as e
:
1601 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1603 if raise_exception_on_error
:
1604 raise K8sException(e
) from e
1608 async def _local_async_exec_pipe(
1612 raise_exception_on_error
: bool = True,
1613 show_error_log
: bool = True,
1614 encode_utf8
: bool = False,
1617 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1618 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1619 command
= "{} | {}".format(command1
, command2
)
1621 "Executing async local command: {}, env: {}".format(command
, env
)
1625 command1
= shlex
.split(command1
)
1626 command2
= shlex
.split(command2
)
1628 environ
= os
.environ
.copy()
1633 async with self
.cmd_lock
:
1634 read
, write
= os
.pipe()
1635 process_1
= await asyncio
.create_subprocess_exec(
1636 *command1
, stdout
=write
, env
=environ
1639 process_2
= await asyncio
.create_subprocess_exec(
1640 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1643 stdout
, stderr
= await process_2
.communicate()
1645 return_code
= process_2
.returncode
1649 output
= stdout
.decode("utf-8").strip()
1650 # output = stdout.decode()
1652 output
= stderr
.decode("utf-8").strip()
1653 # output = stderr.decode()
1655 if return_code
!= 0 and show_error_log
:
1657 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1660 self
.log
.debug("Return code: {}".format(return_code
))
1662 if raise_exception_on_error
and return_code
!= 0:
1663 raise K8sException(output
)
1666 output
= output
.encode("utf-8").strip()
1667 output
= str(output
).replace("\\n", "\n")
1669 return output
, return_code
1670 except asyncio
.CancelledError
:
1671 # first, kill the processes if they are still running
1672 for process
in (process_1
, process_2
):
1673 if process
.returncode
is None:
1676 except K8sException
:
1678 except Exception as e
:
1679 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1681 if raise_exception_on_error
:
1682 raise K8sException(e
) from e
1686 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1688 Obtains the data of the specified service in the k8cluster.
1690 :param cluster_id: id of a K8s cluster known by OSM
1691 :param service_name: name of the K8s service in the specified namespace
1692 :param namespace: K8s namespace used by the KDU instance
1693 :return: If successful, it will return a service with the following data:
1694 - `name` of the service
1695 - `type` type of service in the k8 cluster
1696 - `ports` List of ports offered by the service, for each port includes at least
1697 name, port, protocol
1698 - `cluster_ip` Internal ip to be used inside k8s cluster
1699 - `external_ip` List of external ips (in case they are available)
1703 paths
, env
= self
._init
_paths
_env
(
1704 cluster_name
=cluster_id
, create_if_not_exist
=True
1707 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1708 self
.kubectl_command
,
1709 paths
["kube_config"],
1711 quote(service_name
),
1714 output
, _rc
= await self
._local
_async
_exec
(
1715 command
=command
, raise_exception_on_error
=True, env
=env
1718 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1721 "name": service_name
,
1722 "type": self
._get
_deep
(data
, ("spec", "type")),
1723 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1724 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1726 if service
["type"] == "LoadBalancer":
1727 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1728 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1729 service
["external_ip"] = ip_list
1733 async def _exec_get_command(
1734 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1736 """Obtains information about the kdu instance."""
1738 full_command
= self
._get
_get
_command
(
1739 get_command
, kdu_instance
, namespace
, kubeconfig
1742 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1746 async def _exec_inspect_command(
1747 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1749 """Obtains information about an Helm Chart package (´helm show´ command)
1752 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1753 kdu_model: The name or path of an Helm Chart
1754 repo_url: Helm Chart repository url
1757 str: the requested info about the Helm Chart package
1762 repo_str
= " --repo {}".format(quote(repo_url
))
1764 # Obtain the Chart's name and store it in the var kdu_model
1765 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1767 kdu_model
, version
= self
._split
_version
(kdu_model
)
1769 version_str
= "--version {}".format(quote(version
))
1773 full_command
= self
._get
_inspect
_command
(
1774 show_command
=inspect_command
,
1775 kdu_model
=quote(kdu_model
),
1777 version
=version_str
,
1780 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1784 async def _get_replica_count_url(
1787 repo_url
: str = None,
1788 resource_name
: str = None,
1789 ) -> tuple[int, str]:
1790 """Get the replica count value in the Helm Chart Values.
1793 kdu_model: The name or path of an Helm Chart
1794 repo_url: Helm Chart repository url
1795 resource_name: Resource name
1799 - The number of replicas of the specific instance; if not found, returns None; and
1800 - The string corresponding to the replica count key in the Helm values
1803 kdu_values
= yaml
.load(
1804 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1805 Loader
=yaml
.SafeLoader
,
1808 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1812 "kdu_values not found for kdu_model {}".format(kdu_model
)
1816 kdu_values
= kdu_values
.get(resource_name
, None)
1819 msg
= "resource {} not found in the values in model {}".format(
1820 resource_name
, kdu_model
1823 raise K8sException(msg
)
1825 duplicate_check
= False
1830 if kdu_values
.get("replicaCount") is not None:
1831 replicas
= kdu_values
["replicaCount"]
1832 replica_str
= "replicaCount"
1833 elif kdu_values
.get("replicas") is not None:
1834 duplicate_check
= True
1835 replicas
= kdu_values
["replicas"]
1836 replica_str
= "replicas"
1840 "replicaCount or replicas not found in the resource"
1841 "{} values in model {}. Cannot be scaled".format(
1842 resource_name
, kdu_model
1847 "replicaCount or replicas not found in the values"
1848 "in model {}. Cannot be scaled".format(kdu_model
)
1851 raise K8sException(msg
)
1853 # Control if replicas and replicaCount exists at the same time
1854 msg
= "replicaCount and replicas are exists at the same time"
1856 if "replicaCount" in kdu_values
:
1858 raise K8sException(msg
)
1860 if "replicas" in kdu_values
:
1862 raise K8sException(msg
)
1864 return replicas
, replica_str
1866 async def _get_replica_count_instance(
1871 resource_name
: str = None,
1873 """Get the replica count value in the instance.
1876 kdu_instance: The name of the KDU instance
1877 namespace: KDU instance namespace
1879 resource_name: Resource name
1882 The number of replicas of the specific instance; if not found, returns None
1885 kdu_values
= yaml
.load(
1886 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1887 Loader
=yaml
.SafeLoader
,
1890 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1896 kdu_values
.get(resource_name
, None) if resource_name
else None
1899 for replica_str
in ("replicaCount", "replicas"):
1901 replicas
= resource_values
.get(replica_str
)
1903 replicas
= kdu_values
.get(replica_str
)
1905 if replicas
is not None:
1910 async def _store_status(
1915 namespace
: str = None,
1916 db_dict
: dict = None,
1919 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1921 :param cluster_id (str): the cluster where the KDU instance is deployed
1922 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1923 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1924 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1925 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1926 values for the keys:
1927 - "collection": The Mongo DB collection to write to
1928 - "filter": The query filter to use in the update process
1929 - "path": The dot separated keys which targets the object to be updated
1934 detailed_status
= await self
._status
_kdu
(
1935 cluster_id
=cluster_id
,
1936 kdu_instance
=kdu_instance
,
1938 namespace
=namespace
,
1941 status
= detailed_status
.get("info").get("description")
1942 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1944 # write status to db
1945 result
= await self
.write_app_status_to_db(
1948 detailed_status
=str(detailed_status
),
1949 operation
=operation
,
1953 self
.log
.info("Error writing in database. Task exiting...")
1955 except asyncio
.CancelledError
as e
:
1957 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1959 except Exception as e
:
1960 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1962 # params for use in -f file
1963 # returns values file option and filename (in order to delete it at the end)
1964 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> tuple[str, str]:
1965 if params
and len(params
) > 0:
1966 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1968 def get_random_number():
1969 r
= random
.SystemRandom().randint(1, 99999999)
1977 value
= params
.get(key
)
1978 if "!!yaml" in str(value
):
1979 value
= yaml
.safe_load(value
[7:])
1980 params2
[key
] = value
1982 values_file
= get_random_number() + ".yaml"
1983 with
open(values_file
, "w") as stream
:
1984 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1986 return "-f {}".format(values_file
), values_file
1990 # params for use in --set option
1992 def _params_to_set_option(params
: dict) -> str:
1994 f
"{quote(str(key))}={quote(str(value))}"
1995 for key
, value
in params
.items()
1996 if value
is not None
2000 return "--set " + ",".join(pairs
)
2003 def generate_kdu_instance_name(**kwargs
):
2004 chart_name
= kwargs
["kdu_model"]
2005 # check embeded chart (file or dir)
2006 if chart_name
.startswith("/"):
2007 # extract file or directory name
2008 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2010 elif "://" in chart_name
:
2011 # extract last portion of URL
2012 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2015 for c
in chart_name
:
2016 if c
.isalpha() or c
.isnumeric():
2023 # if does not start with alpha character, prefix 'a'
2024 if not name
[0].isalpha():
2029 def get_random_number():
2030 r
= random
.SystemRandom().randint(1, 99999999)
2032 s
= s
.rjust(10, "0")
2035 name
= name
+ get_random_number()
2038 def _split_version(self
, kdu_model
: str) -> tuple[str, str]:
2040 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2041 parts
= kdu_model
.split(sep
=":")
2043 version
= str(parts
[1])
2044 kdu_model
= parts
[0]
2045 return kdu_model
, version
2047 def _split_repo(self
, kdu_model
: str) -> tuple[str, str]:
2048 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2051 kdu_model (str): Associated KDU model
2054 (str, str): Tuple with the Chart name in index 0, and the repo name
2055 in index 2; if there was a problem finding them, return None
2062 idx
= kdu_model
.find("/")
2064 chart_name
= kdu_model
[idx
+ 1 :]
2065 repo_name
= kdu_model
[:idx
]
2067 return chart_name
, repo_name
2069 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2070 """Obtain the Helm repository for an Helm Chart
2073 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2074 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2077 str: the repository URL; if Helm Chart is a local one, the function returns None
2080 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2084 # Find repository link
2085 local_repo_list
= await self
.repo_list(cluster_uuid
)
2086 for repo
in local_repo_list
:
2087 if repo
["name"] == repo_name
:
2088 repo_url
= repo
["url"]
2089 break # it is not necessary to continue the loop if the repo link was found...
2093 async def create_certificate(
2094 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2096 paths
, env
= self
._init
_paths
_env
(
2097 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2099 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2100 await kubectl
.create_certificate(
2101 namespace
=namespace
,
2103 dns_prefix
=dns_prefix
,
2104 secret_name
=secret_name
,
2106 issuer_name
="ca-issuer",
2109 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2110 paths
, env
= self
._init
_paths
_env
(
2111 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2113 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2114 await kubectl
.delete_certificate(namespace
, certificate_name
)
2116 async def create_namespace(
2123 Create a namespace in a specific cluster
2125 :param namespace: Namespace to be created
2126 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2127 :param labels: Dictionary with labels for the new namespace
2130 paths
, env
= self
._init
_paths
_env
(
2131 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2133 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2134 await kubectl
.create_namespace(
2139 async def delete_namespace(
2145 Delete a namespace in a specific cluster
2147 :param namespace: namespace to be deleted
2148 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2151 paths
, env
= self
._init
_paths
_env
(
2152 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2154 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2155 await kubectl
.delete_namespace(
2159 async def copy_secret_data(
2165 src_namespace
: str = "osm",
2166 dst_namespace
: str = "osm",
2169 Copy a single key and value from an existing secret to a new one
2171 :param src_secret: name of the existing secret
2172 :param dst_secret: name of the new secret
2173 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2174 :param data_key: key of the existing secret to be copied
2175 :param src_namespace: Namespace of the existing secret
2176 :param dst_namespace: Namespace of the new secret
2179 paths
, env
= self
._init
_paths
_env
(
2180 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2182 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2183 secret_data
= await kubectl
.get_secret_content(
2185 namespace
=src_namespace
,
2187 # Only the corresponding data_key value needs to be copy
2188 data
= {data_key
: secret_data
.get(data_key
)}
2189 await kubectl
.create_secret(
2192 namespace
=dst_namespace
,
2193 secret_type
="Opaque",
2196 async def setup_default_rbac(
2207 Create a basic RBAC for a new namespace.
2209 :param name: name of both Role and Role Binding
2210 :param namespace: K8s namespace
2211 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2212 :param api_groups: Api groups to be allowed in Policy Rule
2213 :param resources: Resources to be allowed in Policy Rule
2214 :param verbs: Verbs to be allowed in Policy Rule
2215 :param service_account: Service Account name used to bind the Role
2218 paths
, env
= self
._init
_paths
_env
(
2219 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2221 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2222 await kubectl
.create_role(
2225 namespace
=namespace
,
2226 api_groups
=api_groups
,
2227 resources
=resources
,
2230 await kubectl
.create_role_binding(
2233 namespace
=namespace
,
2235 sa_name
=service_account
,