6ddf1186b72a41030bea9af97d903ff5ded33125
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 # Lock to avoid concurrent execution of helm commands
94 self
.cmd_lock
= asyncio
.Lock()
96 def _get_namespace(self
, cluster_uuid
: str) -> str:
98 Obtains the namespace used by the cluster with the uuid passed by argument
100 param: cluster_uuid: cluster's uuid
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster
= self
.db
.get_one(
105 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
107 return k8scluster
.get("namespace")
112 namespace
: str = "kube-system",
113 reuse_cluster_uuid
=None,
117 It prepares a given K8s cluster environment to run Charts
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
130 if reuse_cluster_uuid
:
131 cluster_id
= reuse_cluster_uuid
133 cluster_id
= str(uuid4())
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
139 paths
, env
= self
._init
_paths
_env
(
140 cluster_name
=cluster_id
, create_if_not_exist
=True
142 mode
= stat
.S_IRUSR | stat
.S_IWUSR
143 with
open(paths
["kube_config"], "w", mode
) as f
:
145 os
.chmod(paths
["kube_config"], 0o600)
147 # Code with initialization specific of helm version
148 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
150 # sync fs with local data
151 self
.fs
.reverse_sync(from_path
=cluster_id
)
153 self
.log
.info("Cluster {} initialized".format(cluster_id
))
155 return cluster_id
, n2vc_installed_sw
162 repo_type
: str = "chart",
165 password
: str = None,
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid
, repo_type
, name
, url
174 paths
, env
= self
._init
_paths
_env
(
175 cluster_name
=cluster_uuid
, create_if_not_exist
=True
179 self
.fs
.sync(from_path
=cluster_uuid
)
181 # helm repo add name url
182 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths
["kube_config"], self
._helm
_command
, name
, url
187 temp_cert_file
= os
.path
.join(
188 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
190 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
191 with
open(temp_cert_file
, "w") as the_cert
:
193 command
+= " --ca-file {}".format(temp_cert_file
)
196 command
+= " --username={}".format(user
)
199 command
+= " --password={}".format(password
)
201 self
.log
.debug("adding repo: {}".format(command
))
202 await self
._local
_async
_exec
(
203 command
=command
, raise_exception_on_error
=True, env
=env
207 command
= "env KUBECONFIG={} {} repo update {}".format(
208 paths
["kube_config"], self
._helm
_command
, name
210 self
.log
.debug("updating repo: {}".format(command
))
211 await self
._local
_async
_exec
(
212 command
=command
, raise_exception_on_error
=False, env
=env
216 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
218 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid
, repo_type
, name
226 paths
, env
= self
._init
_paths
_env
(
227 cluster_name
=cluster_uuid
, create_if_not_exist
=True
231 self
.fs
.sync(from_path
=cluster_uuid
)
234 command
= "{} repo update {}".format(self
._helm
_command
, name
)
235 self
.log
.debug("updating repo: {}".format(command
))
236 await self
._local
_async
_exec
(
237 command
=command
, raise_exception_on_error
=False, env
=env
241 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
243 async def repo_list(self
, cluster_uuid
: str) -> list:
245 Get the list of registered repositories
247 :return: list of registered repositories: [ (name, url) .... ]
250 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
253 paths
, env
= self
._init
_paths
_env
(
254 cluster_name
=cluster_uuid
, create_if_not_exist
=True
258 self
.fs
.sync(from_path
=cluster_uuid
)
260 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths
["kube_config"], self
._helm
_command
264 # Set exception to false because if there are no repos just want an empty list
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=False, env
=env
270 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
273 if output
and len(output
) > 0:
274 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self
._lower
_keys
_list
(repos
)
282 async def repo_remove(self
, cluster_uuid
: str, name
: str):
284 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
288 paths
, env
= self
._init
_paths
_env
(
289 cluster_name
=cluster_uuid
, create_if_not_exist
=True
293 self
.fs
.sync(from_path
=cluster_uuid
)
295 command
= "env KUBECONFIG={} {} repo remove {}".format(
296 paths
["kube_config"], self
._helm
_command
, name
298 await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=True, env
=env
303 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
309 uninstall_sw
: bool = False,
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
322 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid
, uninstall_sw
330 self
.fs
.sync(from_path
=cluster_uuid
)
332 # uninstall releases if needed.
334 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
335 if len(releases
) > 0:
339 kdu_instance
= r
.get("name")
340 chart
= r
.get("chart")
342 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
344 await self
.uninstall(
345 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
347 except Exception as e
:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
352 "Error uninstalling release {}: {}".format(
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid
)
362 False # Allow to remove k8s cluster without removing Tiller
366 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
368 # delete cluster directory
369 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
370 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
371 # Remove also local directorio if still exist
372 direct
= self
.fs
.path
+ "/" + cluster_uuid
373 shutil
.rmtree(direct
, ignore_errors
=True)
377 def _is_helm_chart_a_file(self
, chart_name
: str):
378 return chart_name
.count("/") > 1
380 async def _install_impl(
388 timeout
: float = 300,
390 db_dict
: dict = None,
391 kdu_name
: str = None,
392 namespace
: str = None,
395 paths
, env
= self
._init
_paths
_env
(
396 cluster_name
=cluster_id
, create_if_not_exist
=True
400 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
401 cluster_id
=cluster_id
, params
=params
405 kdu_model
, version
= self
._split
_version
(kdu_model
)
407 _
, repo
= self
._split
_repo
(kdu_model
)
409 await self
.repo_update(cluster_id
, repo
)
411 command
= self
._get
_install
_command
(
419 paths
["kube_config"],
422 self
.log
.debug("installing: {}".format(command
))
425 # exec helm in a task
426 exec_task
= asyncio
.ensure_future(
427 coro_or_future
=self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=False, env
=env
432 # write status in another task
433 status_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._store
_status
(
435 cluster_id
=cluster_id
,
436 kdu_instance
=kdu_instance
,
443 # wait for execution task
444 await asyncio
.wait([exec_task
])
449 output
, rc
= exec_task
.result()
453 output
, rc
= await self
._local
_async
_exec
(
454 command
=command
, raise_exception_on_error
=False, env
=env
457 # remove temporal values yaml file
459 os
.remove(file_to_delete
)
462 await self
._store
_status
(
463 cluster_id
=cluster_id
,
464 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
485 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
488 self
.fs
.sync(from_path
=cluster_uuid
)
490 # look for instance to obtain namespace
491 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
492 if not instance_info
:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
496 paths
, env
= self
._init
_paths
_env
(
497 cluster_name
=cluster_uuid
, create_if_not_exist
=True
501 self
.fs
.sync(from_path
=cluster_uuid
)
504 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
505 cluster_id
=cluster_uuid
, params
=params
509 kdu_model
, version
= self
._split
_version
(kdu_model
)
511 _
, repo
= self
._split
_repo
(kdu_model
)
513 await self
.repo_update(cluster_uuid
, repo
)
515 command
= self
._get
_upgrade
_command
(
518 instance_info
["namespace"],
523 paths
["kube_config"],
526 self
.log
.debug("upgrading: {}".format(command
))
530 # exec helm in a task
531 exec_task
= asyncio
.ensure_future(
532 coro_or_future
=self
._local
_async
_exec
(
533 command
=command
, raise_exception_on_error
=False, env
=env
536 # write status in another task
537 status_task
= asyncio
.ensure_future(
538 coro_or_future
=self
._store
_status
(
539 cluster_id
=cluster_uuid
,
540 kdu_instance
=kdu_instance
,
541 namespace
=instance_info
["namespace"],
547 # wait for execution task
548 await asyncio
.wait([exec_task
])
552 output
, rc
= exec_task
.result()
556 output
, rc
= await self
._local
_async
_exec
(
557 command
=command
, raise_exception_on_error
=False, env
=env
560 # remove temporal values yaml file
562 os
.remove(file_to_delete
)
565 await self
._store
_status
(
566 cluster_id
=cluster_uuid
,
567 kdu_instance
=kdu_instance
,
568 namespace
=instance_info
["namespace"],
574 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
576 raise K8sException(msg
)
579 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
581 # return new revision number
582 instance
= await self
.get_instance_info(
583 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
586 revision
= int(instance
.get("revision"))
587 self
.log
.debug("New revision: {}".format(revision
))
597 total_timeout
: float = 1800,
598 cluster_uuid
: str = None,
599 kdu_model
: str = None,
601 db_dict
: dict = None,
604 """Scale a resource in a Helm Chart.
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
619 True if successful, False otherwise
622 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
624 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
625 resource_name
, kdu_model
, cluster_uuid
628 self
.log
.debug(debug_mgs
)
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
633 if not instance_info
:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
637 paths
, env
= self
._init
_paths
_env
(
638 cluster_name
=cluster_uuid
, create_if_not_exist
=True
642 kdu_model
, version
= self
._split
_version
(kdu_model
)
644 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
646 _
, replica_str
= await self
._get
_replica
_count
_url
(
647 kdu_model
, repo_url
, resource_name
650 command
= self
._get
_upgrade
_scale
_command
(
653 instance_info
["namespace"],
660 paths
["kube_config"],
663 self
.log
.debug("scaling: {}".format(command
))
666 # exec helm in a task
667 exec_task
= asyncio
.ensure_future(
668 coro_or_future
=self
._local
_async
_exec
(
669 command
=command
, raise_exception_on_error
=False, env
=env
672 # write status in another task
673 status_task
= asyncio
.ensure_future(
674 coro_or_future
=self
._store
_status
(
675 cluster_id
=cluster_uuid
,
676 kdu_instance
=kdu_instance
,
677 namespace
=instance_info
["namespace"],
683 # wait for execution task
684 await asyncio
.wait([exec_task
])
688 output
, rc
= exec_task
.result()
691 output
, rc
= await self
._local
_async
_exec
(
692 command
=command
, raise_exception_on_error
=False, env
=env
696 await self
._store
_status
(
697 cluster_id
=cluster_uuid
,
698 kdu_instance
=kdu_instance
,
699 namespace
=instance_info
["namespace"],
705 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
707 raise K8sException(msg
)
710 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
714 async def get_scale_count(
722 """Get a resource scale count.
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
728 kdu_model: The name or path of an Helm Chart
729 kwargs: Additional parameters
732 Resource instance count
736 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
739 # look for instance to obtain namespace
740 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
741 if not instance_info
:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
745 paths
, _
= self
._init
_paths
_env
(
746 cluster_name
=cluster_uuid
, create_if_not_exist
=True
749 replicas
= await self
._get
_replica
_count
_instance
(
750 kdu_instance
=kdu_instance
,
751 namespace
=instance_info
["namespace"],
752 kubeconfig
=paths
["kube_config"],
753 resource_name
=resource_name
,
757 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
760 # Get default value if scale count is not found from provided values
761 # Important note: this piece of code shall only be executed in the first scaling operation,
762 # since it is expected that the _get_replica_count_instance is able to obtain the number of
763 # replicas when a scale operation was already conducted previously for this KDU/resource!
765 repo_url
= await self
._find
_repo
(
766 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
768 replicas
, _
= await self
._get
_replica
_count
_url
(
769 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
773 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
774 f
"{resource_name} obtained: {replicas}"
778 msg
= "Replica count not found. Cannot be scaled"
780 raise K8sException(msg
)
785 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
788 "rollback kdu_instance {} to revision {} from cluster {}".format(
789 kdu_instance
, revision
, cluster_uuid
794 self
.fs
.sync(from_path
=cluster_uuid
)
796 # look for instance to obtain namespace
797 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
798 if not instance_info
:
799 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
802 paths
, env
= self
._init
_paths
_env
(
803 cluster_name
=cluster_uuid
, create_if_not_exist
=True
807 self
.fs
.sync(from_path
=cluster_uuid
)
809 command
= self
._get
_rollback
_command
(
810 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
813 self
.log
.debug("rolling_back: {}".format(command
))
815 # exec helm in a task
816 exec_task
= asyncio
.ensure_future(
817 coro_or_future
=self
._local
_async
_exec
(
818 command
=command
, raise_exception_on_error
=False, env
=env
821 # write status in another task
822 status_task
= asyncio
.ensure_future(
823 coro_or_future
=self
._store
_status
(
824 cluster_id
=cluster_uuid
,
825 kdu_instance
=kdu_instance
,
826 namespace
=instance_info
["namespace"],
828 operation
="rollback",
832 # wait for execution task
833 await asyncio
.wait([exec_task
])
838 output
, rc
= exec_task
.result()
841 await self
._store
_status
(
842 cluster_id
=cluster_uuid
,
843 kdu_instance
=kdu_instance
,
844 namespace
=instance_info
["namespace"],
846 operation
="rollback",
850 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
852 raise K8sException(msg
)
855 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
857 # return new revision number
858 instance
= await self
.get_instance_info(
859 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
862 revision
= int(instance
.get("revision"))
863 self
.log
.debug("New revision: {}".format(revision
))
868 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
870 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
871 (this call should happen after all _terminate-config-primitive_ of the VNF
874 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
875 :param kdu_instance: unique name for the KDU instance to be deleted
876 :param kwargs: Additional parameters (None yet)
877 :return: True if successful
881 "uninstall kdu_instance {} from cluster {}".format(
882 kdu_instance
, cluster_uuid
887 self
.fs
.sync(from_path
=cluster_uuid
)
889 # look for instance to obtain namespace
890 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
891 if not instance_info
:
892 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
895 paths
, env
= self
._init
_paths
_env
(
896 cluster_name
=cluster_uuid
, create_if_not_exist
=True
900 self
.fs
.sync(from_path
=cluster_uuid
)
902 command
= self
._get
_uninstall
_command
(
903 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
905 output
, _rc
= await self
._local
_async
_exec
(
906 command
=command
, raise_exception_on_error
=True, env
=env
910 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
912 return self
._output
_to
_table
(output
)
914 async def instances_list(self
, cluster_uuid
: str) -> list:
916 returns a list of deployed releases in a cluster
918 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
922 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
925 self
.fs
.sync(from_path
=cluster_uuid
)
927 # execute internal command
928 result
= await self
._instances
_list
(cluster_uuid
)
931 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
935 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
936 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
937 for instance
in instances
:
938 if instance
.get("name") == kdu_instance
:
940 self
.log
.debug("Instance {} not found".format(kdu_instance
))
943 async def upgrade_charm(
947 charm_id
: str = None,
948 charm_type
: str = None,
949 timeout
: float = None,
951 """This method upgrade charms in VNFs
954 ee_id: Execution environment id
955 path: Local path to the charm
957 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
958 timeout: (Float) Timeout for the ns update operation
961 The output of the update operation if status equals to "completed"
963 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
965 async def exec_primitive(
967 cluster_uuid
: str = None,
968 kdu_instance
: str = None,
969 primitive_name
: str = None,
970 timeout
: float = 300,
972 db_dict
: dict = None,
975 """Exec primitive (Juju action)
977 :param cluster_uuid: The UUID of the cluster or namespace:cluster
978 :param kdu_instance: The unique name of the KDU instance
979 :param primitive_name: Name of action that will be executed
980 :param timeout: Timeout for action execution
981 :param params: Dictionary of all the parameters needed for the action
982 :db_dict: Dictionary for any additional data
983 :param kwargs: Additional parameters (None yet)
985 :return: Returns the output of the action
988 "KDUs deployed with Helm don't support actions "
989 "different from rollback, upgrade and status"
992 async def get_services(
993 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
996 Returns a list of services defined for the specified kdu instance.
998 :param cluster_uuid: UUID of a K8s cluster known by OSM
999 :param kdu_instance: unique name for the KDU instance
1000 :param namespace: K8s namespace used by the KDU instance
1001 :return: If successful, it will return a list of services, Each service
1002 can have the following data:
1003 - `name` of the service
1004 - `type` type of service in the k8 cluster
1005 - `ports` List of ports offered by the service, for each port includes at least
1006 name, port, protocol
1007 - `cluster_ip` Internal ip to be used inside k8s cluster
1008 - `external_ip` List of external ips (in case they are available)
1012 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1013 cluster_uuid
, kdu_instance
1018 paths
, env
= self
._init
_paths
_env
(
1019 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1023 self
.fs
.sync(from_path
=cluster_uuid
)
1025 # get list of services names for kdu
1026 service_names
= await self
._get
_services
(
1027 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1031 for service
in service_names
:
1032 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1033 service_list
.append(service
)
1036 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1040 async def get_service(
1041 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1045 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1046 service_name
, namespace
, cluster_uuid
1051 self
.fs
.sync(from_path
=cluster_uuid
)
1053 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1056 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1060 async def status_kdu(
1061 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1062 ) -> Union
[str, dict]:
1064 This call would retrieve tha current state of a given KDU instance. It would be
1065 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1066 values_ of the configuration parameters applied to a given instance. This call
1067 would be based on the `status` call.
1069 :param cluster_uuid: UUID of a K8s cluster known by OSM
1070 :param kdu_instance: unique name for the KDU instance
1071 :param kwargs: Additional parameters (None yet)
1072 :param yaml_format: if the return shall be returned as an YAML string or as a
1074 :return: If successful, it will return the following vector of arguments:
1075 - K8s `namespace` in the cluster where the KDU lives
1076 - `state` of the KDU instance. It can be:
1083 - List of `resources` (objects) that this release consists of, sorted by kind,
1084 and the status of those resources
1085 - Last `deployment_time`.
1089 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1090 cluster_uuid
, kdu_instance
1095 self
.fs
.sync(from_path
=cluster_uuid
)
1097 # get instance: needed to obtain namespace
1098 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1099 for instance
in instances
:
1100 if instance
.get("name") == kdu_instance
:
1103 # instance does not exist
1105 "Instance name: {} not found in cluster: {}".format(
1106 kdu_instance
, cluster_uuid
1110 status
= await self
._status
_kdu
(
1111 cluster_id
=cluster_uuid
,
1112 kdu_instance
=kdu_instance
,
1113 namespace
=instance
["namespace"],
1114 yaml_format
=yaml_format
,
1115 show_error_log
=True,
1119 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1123 async def get_values_kdu(
1124 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1127 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1129 return await self
._exec
_get
_command
(
1130 get_command
="values",
1131 kdu_instance
=kdu_instance
,
1132 namespace
=namespace
,
1133 kubeconfig
=kubeconfig
,
1136 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1137 """Method to obtain the Helm Chart package's values
1140 kdu_model: The name or path of an Helm Chart
1141 repo_url: Helm Chart repository url
1144 str: the values of the Helm Chart package
1148 "inspect kdu_model values {} from (optional) repo: {}".format(
1153 return await self
._exec
_inspect
_command
(
1154 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1157 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1160 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1163 return await self
._exec
_inspect
_command
(
1164 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1167 async def synchronize_repos(self
, cluster_uuid
: str):
1169 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1171 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1172 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1174 local_repo_list
= await self
.repo_list(cluster_uuid
)
1175 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1177 deleted_repo_list
= []
1178 added_repo_dict
= {}
1180 # iterate over the list of repos in the database that should be
1181 # added if not present
1182 for repo_name
, db_repo
in db_repo_dict
.items():
1184 # check if it is already present
1185 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1186 repo_id
= db_repo
.get("_id")
1187 if curr_repo_url
!= db_repo
["url"]:
1190 "repo {} url changed, delete and and again".format(
1194 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1195 deleted_repo_list
.append(repo_id
)
1198 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1199 if "ca_cert" in db_repo
:
1200 await self
.repo_add(
1204 cert
=db_repo
["ca_cert"],
1207 await self
.repo_add(
1212 added_repo_dict
[repo_id
] = db_repo
["name"]
1213 except Exception as e
:
1215 "Error adding repo id: {}, err_msg: {} ".format(
1220 # Delete repos that are present but not in nbi_list
1221 for repo_name
in local_repo_dict
:
1222 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1223 self
.log
.debug("delete repo {}".format(repo_name
))
1225 await self
.repo_remove(cluster_uuid
, repo_name
)
1226 deleted_repo_list
.append(repo_name
)
1227 except Exception as e
:
1229 "Error deleting repo, name: {}, err_msg: {}".format(
1234 return deleted_repo_list
, added_repo_dict
1236 except K8sException
:
1238 except Exception as e
:
1239 # Do not raise errors synchronizing repos
1240 self
.log
.error("Error synchronizing repos: {}".format(e
))
1241 raise Exception("Error synchronizing repos: {}".format(e
))
1243 def _get_db_repos_dict(self
, repo_ids
: list):
1245 for repo_id
in repo_ids
:
1246 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1247 db_repos_dict
[db_repo
["name"]] = db_repo
1248 return db_repos_dict
1251 ####################################################################################
1252 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1253 ####################################################################################
1257 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1259 Creates and returns base cluster and kube dirs and returns them.
1260 Also created helm3 dirs according to new directory specification, paths are
1261 not returned but assigned to helm environment variables
1263 :param cluster_name: cluster_name
1264 :return: Dictionary with config_paths and dictionary with helm environment variables
1268 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1270 Implements the helm version dependent cluster initialization
1274 async def _instances_list(self
, cluster_id
):
1276 Implements the helm version dependent helm instances list
1280 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1282 Implements the helm version dependent method to obtain services from a helm instance
1286 async def _status_kdu(
1290 namespace
: str = None,
1291 yaml_format
: bool = False,
1292 show_error_log
: bool = False,
1293 ) -> Union
[str, dict]:
1295 Implements the helm version dependent method to obtain status of a helm instance
1299 def _get_install_command(
1311 Obtain command to be executed to delete the indicated instance
1315 def _get_upgrade_scale_command(
1328 """Obtain command to be executed to upgrade the indicated instance."""
1331 def _get_upgrade_command(
1343 Obtain command to be executed to upgrade the indicated instance
1347 def _get_rollback_command(
1348 self
, kdu_instance
, namespace
, revision
, kubeconfig
1351 Obtain command to be executed to rollback the indicated instance
1355 def _get_uninstall_command(
1356 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1359 Obtain command to be executed to delete the indicated instance
1363 def _get_inspect_command(
1364 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1366 """Generates the command to obtain the information about an Helm Chart package
1367 (´helm show ...´ command)
1370 show_command: the second part of the command (`helm show <show_command>`)
1371 kdu_model: The name or path of an Helm Chart
1372 repo_url: Helm Chart repository url
1373 version: constraint with specific version of the Chart to use
1376 str: the generated Helm Chart command
1380 def _get_get_command(
1381 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1383 """Obtain command to be executed to get information about the kdu instance."""
1386 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1388 Method call to uninstall cluster software for helm. This method is dependent
1390 For Helm v2 it will be called when Tiller must be uninstalled
1391 For Helm v3 it does nothing and does not need to be callled
1395 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1397 Obtains the cluster repos identifiers
1401 ####################################################################################
1402 ################################### P R I V A T E ##################################
1403 ####################################################################################
1407 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1408 if os
.path
.exists(filename
):
1411 msg
= "File {} does not exist".format(filename
)
1412 if exception_if_not_exists
:
1413 raise K8sException(msg
)
1416 def _remove_multiple_spaces(strobj
):
1417 strobj
= strobj
.strip()
1418 while " " in strobj
:
1419 strobj
= strobj
.replace(" ", " ")
1423 def _output_to_lines(output
: str) -> list:
1424 output_lines
= list()
1425 lines
= output
.splitlines(keepends
=False)
1429 output_lines
.append(line
)
1433 def _output_to_table(output
: str) -> list:
1434 output_table
= list()
1435 lines
= output
.splitlines(keepends
=False)
1437 line
= line
.replace("\t", " ")
1439 output_table
.append(line_list
)
1440 cells
= line
.split(sep
=" ")
1444 line_list
.append(cell
)
1448 def _parse_services(output
: str) -> list:
1449 lines
= output
.splitlines(keepends
=False)
1452 line
= line
.replace("\t", " ")
1453 cells
= line
.split(sep
=" ")
1454 if len(cells
) > 0 and cells
[0].startswith("service/"):
1455 elems
= cells
[0].split(sep
="/")
1457 services
.append(elems
[1])
1461 def _get_deep(dictionary
: dict, members
: tuple):
1466 value
= target
.get(m
)
1475 # find key:value in several lines
1477 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1478 for line
in p_lines
:
1480 if line
.startswith(p_key
+ ":"):
1481 parts
= line
.split(":")
1482 the_value
= parts
[1].strip()
1490 def _lower_keys_list(input_list
: list):
1492 Transform the keys in a list of dictionaries to lower case and returns a new list
1497 for dictionary
in input_list
:
1498 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1499 new_list
.append(new_dict
)
1502 async def _local_async_exec(
1505 raise_exception_on_error
: bool = False,
1506 show_error_log
: bool = True,
1507 encode_utf8
: bool = False,
1511 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1513 "Executing async local command: {}, env: {}".format(command
, env
)
1517 command
= shlex
.split(command
)
1519 environ
= os
.environ
.copy()
1524 async with self
.cmd_lock
:
1525 process
= await asyncio
.create_subprocess_exec(
1527 stdout
=asyncio
.subprocess
.PIPE
,
1528 stderr
=asyncio
.subprocess
.PIPE
,
1532 # wait for command terminate
1533 stdout
, stderr
= await process
.communicate()
1535 return_code
= process
.returncode
1539 output
= stdout
.decode("utf-8").strip()
1540 # output = stdout.decode()
1542 output
= stderr
.decode("utf-8").strip()
1543 # output = stderr.decode()
1545 if return_code
!= 0 and show_error_log
:
1547 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1550 self
.log
.debug("Return code: {}".format(return_code
))
1552 if raise_exception_on_error
and return_code
!= 0:
1553 raise K8sException(output
)
1556 output
= output
.encode("utf-8").strip()
1557 output
= str(output
).replace("\\n", "\n")
1559 return output
, return_code
1561 except asyncio
.CancelledError
:
1562 # first, kill the process if it is still running
1563 if process
.returncode
is None:
1566 except K8sException
:
1568 except Exception as e
:
1569 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1571 if raise_exception_on_error
:
1572 raise K8sException(e
) from e
1576 async def _local_async_exec_pipe(
1580 raise_exception_on_error
: bool = True,
1581 show_error_log
: bool = True,
1582 encode_utf8
: bool = False,
1586 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1587 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1588 command
= "{} | {}".format(command1
, command2
)
1590 "Executing async local command: {}, env: {}".format(command
, env
)
1594 command1
= shlex
.split(command1
)
1595 command2
= shlex
.split(command2
)
1597 environ
= os
.environ
.copy()
1602 async with self
.cmd_lock
:
1603 read
, write
= os
.pipe()
1604 process_1
= await asyncio
.create_subprocess_exec(
1605 *command1
, stdout
=write
, env
=environ
1608 process_2
= await asyncio
.create_subprocess_exec(
1609 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1612 stdout
, stderr
= await process_2
.communicate()
1614 return_code
= process_2
.returncode
1618 output
= stdout
.decode("utf-8").strip()
1619 # output = stdout.decode()
1621 output
= stderr
.decode("utf-8").strip()
1622 # output = stderr.decode()
1624 if return_code
!= 0 and show_error_log
:
1626 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1629 self
.log
.debug("Return code: {}".format(return_code
))
1631 if raise_exception_on_error
and return_code
!= 0:
1632 raise K8sException(output
)
1635 output
= output
.encode("utf-8").strip()
1636 output
= str(output
).replace("\\n", "\n")
1638 return output
, return_code
1639 except asyncio
.CancelledError
:
1640 # first, kill the processes if they are still running
1641 for process
in (process_1
, process_2
):
1642 if process
.returncode
is None:
1645 except K8sException
:
1647 except Exception as e
:
1648 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1650 if raise_exception_on_error
:
1651 raise K8sException(e
) from e
1655 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1657 Obtains the data of the specified service in the k8cluster.
1659 :param cluster_id: id of a K8s cluster known by OSM
1660 :param service_name: name of the K8s service in the specified namespace
1661 :param namespace: K8s namespace used by the KDU instance
1662 :return: If successful, it will return a service with the following data:
1663 - `name` of the service
1664 - `type` type of service in the k8 cluster
1665 - `ports` List of ports offered by the service, for each port includes at least
1666 name, port, protocol
1667 - `cluster_ip` Internal ip to be used inside k8s cluster
1668 - `external_ip` List of external ips (in case they are available)
1672 paths
, env
= self
._init
_paths
_env
(
1673 cluster_name
=cluster_id
, create_if_not_exist
=True
1676 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1677 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1680 output
, _rc
= await self
._local
_async
_exec
(
1681 command
=command
, raise_exception_on_error
=True, env
=env
1684 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1687 "name": service_name
,
1688 "type": self
._get
_deep
(data
, ("spec", "type")),
1689 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1690 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1692 if service
["type"] == "LoadBalancer":
1693 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1694 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1695 service
["external_ip"] = ip_list
1699 async def _exec_get_command(
1700 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1702 """Obtains information about the kdu instance."""
1704 full_command
= self
._get
_get
_command
(
1705 get_command
, kdu_instance
, namespace
, kubeconfig
1708 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1712 async def _exec_inspect_command(
1713 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1715 """Obtains information about an Helm Chart package (´helm show´ command)
1718 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1719 kdu_model: The name or path of an Helm Chart
1720 repo_url: Helm Chart repository url
1723 str: the requested info about the Helm Chart package
1728 repo_str
= " --repo {}".format(repo_url
)
1730 # Obtain the Chart's name and store it in the var kdu_model
1731 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1733 kdu_model
, version
= self
._split
_version
(kdu_model
)
1735 version_str
= "--version {}".format(version
)
1739 full_command
= self
._get
_inspect
_command
(
1740 show_command
=inspect_command
,
1741 kdu_model
=kdu_model
,
1743 version
=version_str
,
1746 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1750 async def _get_replica_count_url(
1753 repo_url
: str = None,
1754 resource_name
: str = None,
1756 """Get the replica count value in the Helm Chart Values.
1759 kdu_model: The name or path of an Helm Chart
1760 repo_url: Helm Chart repository url
1761 resource_name: Resource name
1765 - The number of replicas of the specific instance; if not found, returns None; and
1766 - The string corresponding to the replica count key in the Helm values
1769 kdu_values
= yaml
.load(
1770 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1771 Loader
=yaml
.SafeLoader
,
1774 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1778 "kdu_values not found for kdu_model {}".format(kdu_model
)
1782 kdu_values
= kdu_values
.get(resource_name
, None)
1785 msg
= "resource {} not found in the values in model {}".format(
1786 resource_name
, kdu_model
1789 raise K8sException(msg
)
1791 duplicate_check
= False
1796 if kdu_values
.get("replicaCount") is not None:
1797 replicas
= kdu_values
["replicaCount"]
1798 replica_str
= "replicaCount"
1799 elif kdu_values
.get("replicas") is not None:
1800 duplicate_check
= True
1801 replicas
= kdu_values
["replicas"]
1802 replica_str
= "replicas"
1806 "replicaCount or replicas not found in the resource"
1807 "{} values in model {}. Cannot be scaled".format(
1808 resource_name
, kdu_model
1813 "replicaCount or replicas not found in the values"
1814 "in model {}. Cannot be scaled".format(kdu_model
)
1817 raise K8sException(msg
)
1819 # Control if replicas and replicaCount exists at the same time
1820 msg
= "replicaCount and replicas are exists at the same time"
1822 if "replicaCount" in kdu_values
:
1824 raise K8sException(msg
)
1826 if "replicas" in kdu_values
:
1828 raise K8sException(msg
)
1830 return replicas
, replica_str
1832 async def _get_replica_count_instance(
1837 resource_name
: str = None,
1839 """Get the replica count value in the instance.
1842 kdu_instance: The name of the KDU instance
1843 namespace: KDU instance namespace
1845 resource_name: Resource name
1848 The number of replicas of the specific instance; if not found, returns None
1851 kdu_values
= yaml
.load(
1852 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1853 Loader
=yaml
.SafeLoader
,
1856 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1862 kdu_values
.get(resource_name
, None) if resource_name
else None
1865 for replica_str
in ("replicaCount", "replicas"):
1867 replicas
= resource_values
.get(replica_str
)
1869 replicas
= kdu_values
.get(replica_str
)
1871 if replicas
is not None:
1876 async def _store_status(
1881 namespace
: str = None,
1882 db_dict
: dict = None,
1885 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1887 :param cluster_id (str): the cluster where the KDU instance is deployed
1888 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1889 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1890 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1891 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1892 values for the keys:
1893 - "collection": The Mongo DB collection to write to
1894 - "filter": The query filter to use in the update process
1895 - "path": The dot separated keys which targets the object to be updated
1900 detailed_status
= await self
._status
_kdu
(
1901 cluster_id
=cluster_id
,
1902 kdu_instance
=kdu_instance
,
1904 namespace
=namespace
,
1907 status
= detailed_status
.get("info").get("description")
1908 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1910 # write status to db
1911 result
= await self
.write_app_status_to_db(
1914 detailed_status
=str(detailed_status
),
1915 operation
=operation
,
1919 self
.log
.info("Error writing in database. Task exiting...")
1921 except asyncio
.CancelledError
as e
:
1923 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1925 except Exception as e
:
1926 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1928 # params for use in -f file
1929 # returns values file option and filename (in order to delete it at the end)
1930 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1932 if params
and len(params
) > 0:
1933 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1935 def get_random_number():
1936 r
= random
.randrange(start
=1, stop
=99999999)
1944 value
= params
.get(key
)
1945 if "!!yaml" in str(value
):
1946 value
= yaml
.safe_load(value
[7:])
1947 params2
[key
] = value
1949 values_file
= get_random_number() + ".yaml"
1950 with
open(values_file
, "w") as stream
:
1951 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1953 return "-f {}".format(values_file
), values_file
1957 # params for use in --set option
1959 def _params_to_set_option(params
: dict) -> str:
1961 if params
and len(params
) > 0:
1964 value
= params
.get(key
, None)
1965 if value
is not None:
1967 params_str
+= "--set "
1971 params_str
+= "{}={}".format(key
, value
)
1975 def generate_kdu_instance_name(**kwargs
):
1976 chart_name
= kwargs
["kdu_model"]
1977 # check embeded chart (file or dir)
1978 if chart_name
.startswith("/"):
1979 # extract file or directory name
1980 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1982 elif "://" in chart_name
:
1983 # extract last portion of URL
1984 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1987 for c
in chart_name
:
1988 if c
.isalpha() or c
.isnumeric():
1995 # if does not start with alpha character, prefix 'a'
1996 if not name
[0].isalpha():
2001 def get_random_number():
2002 r
= random
.randrange(start
=1, stop
=99999999)
2004 s
= s
.rjust(10, "0")
2007 name
= name
+ get_random_number()
2010 def _split_version(self
, kdu_model
: str) -> (str, str):
2012 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2013 parts
= kdu_model
.split(sep
=":")
2015 version
= str(parts
[1])
2016 kdu_model
= parts
[0]
2017 return kdu_model
, version
2019 def _split_repo(self
, kdu_model
: str) -> (str, str):
2020 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2023 kdu_model (str): Associated KDU model
2026 (str, str): Tuple with the Chart name in index 0, and the repo name
2027 in index 2; if there was a problem finding them, return None
2034 idx
= kdu_model
.find("/")
2036 chart_name
= kdu_model
[idx
+ 1 :]
2037 repo_name
= kdu_model
[:idx
]
2039 return chart_name
, repo_name
2041 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2042 """Obtain the Helm repository for an Helm Chart
2045 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2046 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2049 str: the repository URL; if Helm Chart is a local one, the function returns None
2052 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2056 # Find repository link
2057 local_repo_list
= await self
.repo_list(cluster_uuid
)
2058 for repo
in local_repo_list
:
2059 if repo
["name"] == repo_name
:
2060 repo_url
= repo
["url"]
2061 break # it is not necessary to continue the loop if the repo link was found...