2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 def _is_helm_chart_a_file(self
, chart_name
: str):
375 return chart_name
.count("/") > 1
377 async def _install_impl(
385 timeout
: float = 300,
387 db_dict
: dict = None,
388 kdu_name
: str = None,
389 namespace
: str = None,
392 paths
, env
= self
._init
_paths
_env
(
393 cluster_name
=cluster_id
, create_if_not_exist
=True
397 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
398 cluster_id
=cluster_id
, params
=params
402 kdu_model
, version
= self
._split
_version
(kdu_model
)
404 repo
= self
._split
_repo
(kdu_model
)
406 self
.repo_update(cluster_id
, repo
)
408 command
= self
._get
_install
_command
(
416 paths
["kube_config"],
419 self
.log
.debug("installing: {}".format(command
))
422 # exec helm in a task
423 exec_task
= asyncio
.ensure_future(
424 coro_or_future
=self
._local
_async
_exec
(
425 command
=command
, raise_exception_on_error
=False, env
=env
429 # write status in another task
430 status_task
= asyncio
.ensure_future(
431 coro_or_future
=self
._store
_status
(
432 cluster_id
=cluster_id
,
433 kdu_instance
=kdu_instance
,
440 # wait for execution task
441 await asyncio
.wait([exec_task
])
446 output
, rc
= exec_task
.result()
450 output
, rc
= await self
._local
_async
_exec
(
451 command
=command
, raise_exception_on_error
=False, env
=env
454 # remove temporal values yaml file
456 os
.remove(file_to_delete
)
459 await self
._store
_status
(
460 cluster_id
=cluster_id
,
461 kdu_instance
=kdu_instance
,
468 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
470 raise K8sException(msg
)
476 kdu_model
: str = None,
478 timeout
: float = 300,
480 db_dict
: dict = None,
482 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
485 self
.fs
.sync(from_path
=cluster_uuid
)
487 # look for instance to obtain namespace
488 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
489 if not instance_info
:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
493 paths
, env
= self
._init
_paths
_env
(
494 cluster_name
=cluster_uuid
, create_if_not_exist
=True
498 self
.fs
.sync(from_path
=cluster_uuid
)
501 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
502 cluster_id
=cluster_uuid
, params
=params
506 kdu_model
, version
= self
._split
_version
(kdu_model
)
508 repo
= self
._split
_repo
(kdu_model
)
510 self
.repo_update(cluster_uuid
, repo
)
512 command
= self
._get
_upgrade
_command
(
515 instance_info
["namespace"],
520 paths
["kube_config"],
523 self
.log
.debug("upgrading: {}".format(command
))
527 # exec helm in a task
528 exec_task
= asyncio
.ensure_future(
529 coro_or_future
=self
._local
_async
_exec
(
530 command
=command
, raise_exception_on_error
=False, env
=env
533 # write status in another task
534 status_task
= asyncio
.ensure_future(
535 coro_or_future
=self
._store
_status
(
536 cluster_id
=cluster_uuid
,
537 kdu_instance
=kdu_instance
,
538 namespace
=instance_info
["namespace"],
544 # wait for execution task
545 await asyncio
.wait([exec_task
])
549 output
, rc
= exec_task
.result()
553 output
, rc
= await self
._local
_async
_exec
(
554 command
=command
, raise_exception_on_error
=False, env
=env
557 # remove temporal values yaml file
559 os
.remove(file_to_delete
)
562 await self
._store
_status
(
563 cluster_id
=cluster_uuid
,
564 kdu_instance
=kdu_instance
,
565 namespace
=instance_info
["namespace"],
571 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
573 raise K8sException(msg
)
576 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
578 # return new revision number
579 instance
= await self
.get_instance_info(
580 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
583 revision
= int(instance
.get("revision"))
584 self
.log
.debug("New revision: {}".format(revision
))
594 total_timeout
: float = 1800,
595 cluster_uuid
: str = None,
596 kdu_model
: str = None,
598 db_dict
: dict = None,
601 """Scale a resource in a Helm Chart.
604 kdu_instance: KDU instance name
605 scale: Scale to which to set the resource
606 resource_name: Resource name
607 total_timeout: The time, in seconds, to wait
608 cluster_uuid: The UUID of the cluster
609 kdu_model: The chart reference
610 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
611 The --wait flag will be set automatically if --atomic is used
612 db_dict: Dictionary for any additional data
613 kwargs: Additional parameters
616 True if successful, False otherwise
619 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
621 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
622 resource_name
, kdu_model
, cluster_uuid
625 self
.log
.debug(debug_mgs
)
627 # look for instance to obtain namespace
628 # get_instance_info function calls the sync command
629 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
630 if not instance_info
:
631 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
634 paths
, env
= self
._init
_paths
_env
(
635 cluster_name
=cluster_uuid
, create_if_not_exist
=True
639 kdu_model
, version
= self
._split
_version
(kdu_model
)
641 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
643 _
, replica_str
= await self
._get
_replica
_count
_url
(
644 kdu_model
, repo_url
, resource_name
647 command
= self
._get
_upgrade
_scale
_command
(
650 instance_info
["namespace"],
657 paths
["kube_config"],
660 self
.log
.debug("scaling: {}".format(command
))
663 # exec helm in a task
664 exec_task
= asyncio
.ensure_future(
665 coro_or_future
=self
._local
_async
_exec
(
666 command
=command
, raise_exception_on_error
=False, env
=env
669 # write status in another task
670 status_task
= asyncio
.ensure_future(
671 coro_or_future
=self
._store
_status
(
672 cluster_id
=cluster_uuid
,
673 kdu_instance
=kdu_instance
,
674 namespace
=instance_info
["namespace"],
680 # wait for execution task
681 await asyncio
.wait([exec_task
])
685 output
, rc
= exec_task
.result()
688 output
, rc
= await self
._local
_async
_exec
(
689 command
=command
, raise_exception_on_error
=False, env
=env
693 await self
._store
_status
(
694 cluster_id
=cluster_uuid
,
695 kdu_instance
=kdu_instance
,
696 namespace
=instance_info
["namespace"],
702 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
704 raise K8sException(msg
)
707 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
711 async def get_scale_count(
719 """Get a resource scale count.
722 cluster_uuid: The UUID of the cluster
723 resource_name: Resource name
724 kdu_instance: KDU instance name
725 kdu_model: The name or path of an Helm Chart
726 kwargs: Additional parameters
729 Resource instance count
733 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
736 # look for instance to obtain namespace
737 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
738 if not instance_info
:
739 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
742 paths
, env
= self
._init
_paths
_env
(
743 cluster_name
=cluster_uuid
, create_if_not_exist
=True
746 replicas
= await self
._get
_replica
_count
_instance
(
747 kdu_instance
=kdu_instance
,
748 namespace
=instance_info
["namespace"],
749 kubeconfig
=paths
["kube_config"],
750 resource_name
=resource_name
,
753 # Get default value if scale count is not found from provided values
755 repo_url
= await self
._find
_repo
(
756 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
758 replicas
, _
= await self
._get
_replica
_count
_url
(
759 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
763 msg
= "Replica count not found. Cannot be scaled"
765 raise K8sException(msg
)
770 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
773 "rollback kdu_instance {} to revision {} from cluster {}".format(
774 kdu_instance
, revision
, cluster_uuid
779 self
.fs
.sync(from_path
=cluster_uuid
)
781 # look for instance to obtain namespace
782 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
783 if not instance_info
:
784 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
787 paths
, env
= self
._init
_paths
_env
(
788 cluster_name
=cluster_uuid
, create_if_not_exist
=True
792 self
.fs
.sync(from_path
=cluster_uuid
)
794 command
= self
._get
_rollback
_command
(
795 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
798 self
.log
.debug("rolling_back: {}".format(command
))
800 # exec helm in a task
801 exec_task
= asyncio
.ensure_future(
802 coro_or_future
=self
._local
_async
_exec
(
803 command
=command
, raise_exception_on_error
=False, env
=env
806 # write status in another task
807 status_task
= asyncio
.ensure_future(
808 coro_or_future
=self
._store
_status
(
809 cluster_id
=cluster_uuid
,
810 kdu_instance
=kdu_instance
,
811 namespace
=instance_info
["namespace"],
813 operation
="rollback",
817 # wait for execution task
818 await asyncio
.wait([exec_task
])
823 output
, rc
= exec_task
.result()
826 await self
._store
_status
(
827 cluster_id
=cluster_uuid
,
828 kdu_instance
=kdu_instance
,
829 namespace
=instance_info
["namespace"],
831 operation
="rollback",
835 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
837 raise K8sException(msg
)
840 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
842 # return new revision number
843 instance
= await self
.get_instance_info(
844 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
847 revision
= int(instance
.get("revision"))
848 self
.log
.debug("New revision: {}".format(revision
))
853 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
855 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
856 (this call should happen after all _terminate-config-primitive_ of the VNF
859 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
860 :param kdu_instance: unique name for the KDU instance to be deleted
861 :param kwargs: Additional parameters (None yet)
862 :return: True if successful
866 "uninstall kdu_instance {} from cluster {}".format(
867 kdu_instance
, cluster_uuid
872 self
.fs
.sync(from_path
=cluster_uuid
)
874 # look for instance to obtain namespace
875 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
876 if not instance_info
:
877 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
880 paths
, env
= self
._init
_paths
_env
(
881 cluster_name
=cluster_uuid
, create_if_not_exist
=True
885 self
.fs
.sync(from_path
=cluster_uuid
)
887 command
= self
._get
_uninstall
_command
(
888 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
890 output
, _rc
= await self
._local
_async
_exec
(
891 command
=command
, raise_exception_on_error
=True, env
=env
895 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
897 return self
._output
_to
_table
(output
)
899 async def instances_list(self
, cluster_uuid
: str) -> list:
901 returns a list of deployed releases in a cluster
903 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
907 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
910 self
.fs
.sync(from_path
=cluster_uuid
)
912 # execute internal command
913 result
= await self
._instances
_list
(cluster_uuid
)
916 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
920 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
921 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
922 for instance
in instances
:
923 if instance
.get("name") == kdu_instance
:
925 self
.log
.debug("Instance {} not found".format(kdu_instance
))
928 async def upgrade_charm(
932 charm_id
: str = None,
933 charm_type
: str = None,
934 timeout
: float = None,
936 """This method upgrade charms in VNFs
939 ee_id: Execution environment id
940 path: Local path to the charm
942 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
943 timeout: (Float) Timeout for the ns update operation
946 The output of the update operation if status equals to "completed"
948 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
950 async def exec_primitive(
952 cluster_uuid
: str = None,
953 kdu_instance
: str = None,
954 primitive_name
: str = None,
955 timeout
: float = 300,
957 db_dict
: dict = None,
960 """Exec primitive (Juju action)
962 :param cluster_uuid: The UUID of the cluster or namespace:cluster
963 :param kdu_instance: The unique name of the KDU instance
964 :param primitive_name: Name of action that will be executed
965 :param timeout: Timeout for action execution
966 :param params: Dictionary of all the parameters needed for the action
967 :db_dict: Dictionary for any additional data
968 :param kwargs: Additional parameters (None yet)
970 :return: Returns the output of the action
973 "KDUs deployed with Helm don't support actions "
974 "different from rollback, upgrade and status"
977 async def get_services(
978 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
981 Returns a list of services defined for the specified kdu instance.
983 :param cluster_uuid: UUID of a K8s cluster known by OSM
984 :param kdu_instance: unique name for the KDU instance
985 :param namespace: K8s namespace used by the KDU instance
986 :return: If successful, it will return a list of services, Each service
987 can have the following data:
988 - `name` of the service
989 - `type` type of service in the k8 cluster
990 - `ports` List of ports offered by the service, for each port includes at least
992 - `cluster_ip` Internal ip to be used inside k8s cluster
993 - `external_ip` List of external ips (in case they are available)
997 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
998 cluster_uuid
, kdu_instance
1003 paths
, env
= self
._init
_paths
_env
(
1004 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1008 self
.fs
.sync(from_path
=cluster_uuid
)
1010 # get list of services names for kdu
1011 service_names
= await self
._get
_services
(
1012 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1016 for service
in service_names
:
1017 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1018 service_list
.append(service
)
1021 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1025 async def get_service(
1026 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1030 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1031 service_name
, namespace
, cluster_uuid
1036 self
.fs
.sync(from_path
=cluster_uuid
)
1038 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1041 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1045 async def status_kdu(
1046 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1047 ) -> Union
[str, dict]:
1049 This call would retrieve tha current state of a given KDU instance. It would be
1050 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1051 values_ of the configuration parameters applied to a given instance. This call
1052 would be based on the `status` call.
1054 :param cluster_uuid: UUID of a K8s cluster known by OSM
1055 :param kdu_instance: unique name for the KDU instance
1056 :param kwargs: Additional parameters (None yet)
1057 :param yaml_format: if the return shall be returned as an YAML string or as a
1059 :return: If successful, it will return the following vector of arguments:
1060 - K8s `namespace` in the cluster where the KDU lives
1061 - `state` of the KDU instance. It can be:
1068 - List of `resources` (objects) that this release consists of, sorted by kind,
1069 and the status of those resources
1070 - Last `deployment_time`.
1074 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1075 cluster_uuid
, kdu_instance
1080 self
.fs
.sync(from_path
=cluster_uuid
)
1082 # get instance: needed to obtain namespace
1083 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1084 for instance
in instances
:
1085 if instance
.get("name") == kdu_instance
:
1088 # instance does not exist
1090 "Instance name: {} not found in cluster: {}".format(
1091 kdu_instance
, cluster_uuid
1095 status
= await self
._status
_kdu
(
1096 cluster_id
=cluster_uuid
,
1097 kdu_instance
=kdu_instance
,
1098 namespace
=instance
["namespace"],
1099 yaml_format
=yaml_format
,
1100 show_error_log
=True,
1104 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1108 async def get_values_kdu(
1109 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1112 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1114 return await self
._exec
_get
_command
(
1115 get_command
="values",
1116 kdu_instance
=kdu_instance
,
1117 namespace
=namespace
,
1118 kubeconfig
=kubeconfig
,
1121 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1122 """Method to obtain the Helm Chart package's values
1125 kdu_model: The name or path of an Helm Chart
1126 repo_url: Helm Chart repository url
1129 str: the values of the Helm Chart package
1133 "inspect kdu_model values {} from (optional) repo: {}".format(
1138 return await self
._exec
_inspect
_command
(
1139 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1142 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1145 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1148 return await self
._exec
_inspect
_command
(
1149 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1152 async def synchronize_repos(self
, cluster_uuid
: str):
1154 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1156 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1157 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1159 local_repo_list
= await self
.repo_list(cluster_uuid
)
1160 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1162 deleted_repo_list
= []
1163 added_repo_dict
= {}
1165 # iterate over the list of repos in the database that should be
1166 # added if not present
1167 for repo_name
, db_repo
in db_repo_dict
.items():
1169 # check if it is already present
1170 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1171 repo_id
= db_repo
.get("_id")
1172 if curr_repo_url
!= db_repo
["url"]:
1175 "repo {} url changed, delete and and again".format(
1179 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1180 deleted_repo_list
.append(repo_id
)
1183 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1184 if "ca_cert" in db_repo
:
1185 await self
.repo_add(
1189 cert
=db_repo
["ca_cert"],
1192 await self
.repo_add(
1197 added_repo_dict
[repo_id
] = db_repo
["name"]
1198 except Exception as e
:
1200 "Error adding repo id: {}, err_msg: {} ".format(
1205 # Delete repos that are present but not in nbi_list
1206 for repo_name
in local_repo_dict
:
1207 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1208 self
.log
.debug("delete repo {}".format(repo_name
))
1210 await self
.repo_remove(cluster_uuid
, repo_name
)
1211 deleted_repo_list
.append(repo_name
)
1212 except Exception as e
:
1214 "Error deleting repo, name: {}, err_msg: {}".format(
1219 return deleted_repo_list
, added_repo_dict
1221 except K8sException
:
1223 except Exception as e
:
1224 # Do not raise errors synchronizing repos
1225 self
.log
.error("Error synchronizing repos: {}".format(e
))
1226 raise Exception("Error synchronizing repos: {}".format(e
))
1228 def _get_db_repos_dict(self
, repo_ids
: list):
1230 for repo_id
in repo_ids
:
1231 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1232 db_repos_dict
[db_repo
["name"]] = db_repo
1233 return db_repos_dict
1236 ####################################################################################
1237 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1238 ####################################################################################
1242 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1244 Creates and returns base cluster and kube dirs and returns them.
1245 Also created helm3 dirs according to new directory specification, paths are
1246 not returned but assigned to helm environment variables
1248 :param cluster_name: cluster_name
1249 :return: Dictionary with config_paths and dictionary with helm environment variables
1253 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1255 Implements the helm version dependent cluster initialization
1259 async def _instances_list(self
, cluster_id
):
1261 Implements the helm version dependent helm instances list
1265 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1267 Implements the helm version dependent method to obtain services from a helm instance
1271 async def _status_kdu(
1275 namespace
: str = None,
1276 yaml_format
: bool = False,
1277 show_error_log
: bool = False,
1278 ) -> Union
[str, dict]:
1280 Implements the helm version dependent method to obtain status of a helm instance
1284 def _get_install_command(
1296 Obtain command to be executed to delete the indicated instance
1300 def _get_upgrade_scale_command(
1313 """Obtain command to be executed to upgrade the indicated instance."""
1316 def _get_upgrade_command(
1328 Obtain command to be executed to upgrade the indicated instance
1332 def _get_rollback_command(
1333 self
, kdu_instance
, namespace
, revision
, kubeconfig
1336 Obtain command to be executed to rollback the indicated instance
1340 def _get_uninstall_command(
1341 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1344 Obtain command to be executed to delete the indicated instance
1348 def _get_inspect_command(
1349 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1351 """Generates the command to obtain the information about an Helm Chart package
1352 (´helm show ...´ command)
1355 show_command: the second part of the command (`helm show <show_command>`)
1356 kdu_model: The name or path of an Helm Chart
1357 repo_url: Helm Chart repository url
1358 version: constraint with specific version of the Chart to use
1361 str: the generated Helm Chart command
1365 def _get_get_command(
1366 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1368 """Obtain command to be executed to get information about the kdu instance."""
1371 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1373 Method call to uninstall cluster software for helm. This method is dependent
1375 For Helm v2 it will be called when Tiller must be uninstalled
1376 For Helm v3 it does nothing and does not need to be callled
1380 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1382 Obtains the cluster repos identifiers
1386 ####################################################################################
1387 ################################### P R I V A T E ##################################
1388 ####################################################################################
1392 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1393 if os
.path
.exists(filename
):
1396 msg
= "File {} does not exist".format(filename
)
1397 if exception_if_not_exists
:
1398 raise K8sException(msg
)
1401 def _remove_multiple_spaces(strobj
):
1402 strobj
= strobj
.strip()
1403 while " " in strobj
:
1404 strobj
= strobj
.replace(" ", " ")
1408 def _output_to_lines(output
: str) -> list:
1409 output_lines
= list()
1410 lines
= output
.splitlines(keepends
=False)
1414 output_lines
.append(line
)
1418 def _output_to_table(output
: str) -> list:
1419 output_table
= list()
1420 lines
= output
.splitlines(keepends
=False)
1422 line
= line
.replace("\t", " ")
1424 output_table
.append(line_list
)
1425 cells
= line
.split(sep
=" ")
1429 line_list
.append(cell
)
1433 def _parse_services(output
: str) -> list:
1434 lines
= output
.splitlines(keepends
=False)
1437 line
= line
.replace("\t", " ")
1438 cells
= line
.split(sep
=" ")
1439 if len(cells
) > 0 and cells
[0].startswith("service/"):
1440 elems
= cells
[0].split(sep
="/")
1442 services
.append(elems
[1])
1446 def _get_deep(dictionary
: dict, members
: tuple):
1451 value
= target
.get(m
)
1460 # find key:value in several lines
1462 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1463 for line
in p_lines
:
1465 if line
.startswith(p_key
+ ":"):
1466 parts
= line
.split(":")
1467 the_value
= parts
[1].strip()
1475 def _lower_keys_list(input_list
: list):
1477 Transform the keys in a list of dictionaries to lower case and returns a new list
1482 for dictionary
in input_list
:
1483 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1484 new_list
.append(new_dict
)
1487 async def _local_async_exec(
1490 raise_exception_on_error
: bool = False,
1491 show_error_log
: bool = True,
1492 encode_utf8
: bool = False,
1496 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1498 "Executing async local command: {}, env: {}".format(command
, env
)
1502 command
= shlex
.split(command
)
1504 environ
= os
.environ
.copy()
1509 process
= await asyncio
.create_subprocess_exec(
1511 stdout
=asyncio
.subprocess
.PIPE
,
1512 stderr
=asyncio
.subprocess
.PIPE
,
1516 # wait for command terminate
1517 stdout
, stderr
= await process
.communicate()
1519 return_code
= process
.returncode
1523 output
= stdout
.decode("utf-8").strip()
1524 # output = stdout.decode()
1526 output
= stderr
.decode("utf-8").strip()
1527 # output = stderr.decode()
1529 if return_code
!= 0 and show_error_log
:
1531 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1534 self
.log
.debug("Return code: {}".format(return_code
))
1536 if raise_exception_on_error
and return_code
!= 0:
1537 raise K8sException(output
)
1540 output
= output
.encode("utf-8").strip()
1541 output
= str(output
).replace("\\n", "\n")
1543 return output
, return_code
1545 except asyncio
.CancelledError
:
1547 except K8sException
:
1549 except Exception as e
:
1550 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1552 if raise_exception_on_error
:
1553 raise K8sException(e
) from e
1557 async def _local_async_exec_pipe(
1561 raise_exception_on_error
: bool = True,
1562 show_error_log
: bool = True,
1563 encode_utf8
: bool = False,
1567 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1568 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1569 command
= "{} | {}".format(command1
, command2
)
1571 "Executing async local command: {}, env: {}".format(command
, env
)
1575 command1
= shlex
.split(command1
)
1576 command2
= shlex
.split(command2
)
1578 environ
= os
.environ
.copy()
1583 read
, write
= os
.pipe()
1584 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1586 process_2
= await asyncio
.create_subprocess_exec(
1587 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1590 stdout
, stderr
= await process_2
.communicate()
1592 return_code
= process_2
.returncode
1596 output
= stdout
.decode("utf-8").strip()
1597 # output = stdout.decode()
1599 output
= stderr
.decode("utf-8").strip()
1600 # output = stderr.decode()
1602 if return_code
!= 0 and show_error_log
:
1604 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1607 self
.log
.debug("Return code: {}".format(return_code
))
1609 if raise_exception_on_error
and return_code
!= 0:
1610 raise K8sException(output
)
1613 output
= output
.encode("utf-8").strip()
1614 output
= str(output
).replace("\\n", "\n")
1616 return output
, return_code
1617 except asyncio
.CancelledError
:
1619 except K8sException
:
1621 except Exception as e
:
1622 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1624 if raise_exception_on_error
:
1625 raise K8sException(e
) from e
1629 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1631 Obtains the data of the specified service in the k8cluster.
1633 :param cluster_id: id of a K8s cluster known by OSM
1634 :param service_name: name of the K8s service in the specified namespace
1635 :param namespace: K8s namespace used by the KDU instance
1636 :return: If successful, it will return a service with the following data:
1637 - `name` of the service
1638 - `type` type of service in the k8 cluster
1639 - `ports` List of ports offered by the service, for each port includes at least
1640 name, port, protocol
1641 - `cluster_ip` Internal ip to be used inside k8s cluster
1642 - `external_ip` List of external ips (in case they are available)
1646 paths
, env
= self
._init
_paths
_env
(
1647 cluster_name
=cluster_id
, create_if_not_exist
=True
1650 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1651 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1654 output
, _rc
= await self
._local
_async
_exec
(
1655 command
=command
, raise_exception_on_error
=True, env
=env
1658 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1661 "name": service_name
,
1662 "type": self
._get
_deep
(data
, ("spec", "type")),
1663 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1664 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1666 if service
["type"] == "LoadBalancer":
1667 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1668 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1669 service
["external_ip"] = ip_list
1673 async def _exec_get_command(
1674 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1676 """Obtains information about the kdu instance."""
1678 full_command
= self
._get
_get
_command
(
1679 get_command
, kdu_instance
, namespace
, kubeconfig
1682 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1686 async def _exec_inspect_command(
1687 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1689 """Obtains information about an Helm Chart package (´helm show´ command)
1692 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1693 kdu_model: The name or path of an Helm Chart
1694 repo_url: Helm Chart repository url
1697 str: the requested info about the Helm Chart package
1702 repo_str
= " --repo {}".format(repo_url
)
1704 idx
= kdu_model
.find("/")
1707 kdu_model
= kdu_model
[idx
:]
1709 kdu_model
, version
= self
._split
_version
(kdu_model
)
1711 version_str
= "--version {}".format(version
)
1715 full_command
= self
._get
_inspect
_command
(
1716 inspect_command
, kdu_model
, repo_str
, version_str
1719 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1723 async def _get_replica_count_url(
1726 repo_url
: str = None,
1727 resource_name
: str = None,
1729 """Get the replica count value in the Helm Chart Values.
1732 kdu_model: The name or path of an Helm Chart
1733 repo_url: Helm Chart repository url
1734 resource_name: Resource name
1737 True if replicas, False replicaCount
1740 kdu_values
= yaml
.load(
1741 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1742 Loader
=yaml
.SafeLoader
,
1747 "kdu_values not found for kdu_model {}".format(kdu_model
)
1751 kdu_values
= kdu_values
.get(resource_name
, None)
1754 msg
= "resource {} not found in the values in model {}".format(
1755 resource_name
, kdu_model
1758 raise K8sException(msg
)
1760 duplicate_check
= False
1765 if kdu_values
.get("replicaCount", None):
1766 replicas
= kdu_values
["replicaCount"]
1767 replica_str
= "replicaCount"
1768 elif kdu_values
.get("replicas", None):
1769 duplicate_check
= True
1770 replicas
= kdu_values
["replicas"]
1771 replica_str
= "replicas"
1775 "replicaCount or replicas not found in the resource"
1776 "{} values in model {}. Cannot be scaled".format(
1777 resource_name
, kdu_model
1782 "replicaCount or replicas not found in the values"
1783 "in model {}. Cannot be scaled".format(kdu_model
)
1786 raise K8sException(msg
)
1788 # Control if replicas and replicaCount exists at the same time
1789 msg
= "replicaCount and replicas are exists at the same time"
1791 if "replicaCount" in kdu_values
:
1793 raise K8sException(msg
)
1795 if "replicas" in kdu_values
:
1797 raise K8sException(msg
)
1799 return replicas
, replica_str
1801 async def _get_replica_count_instance(
1806 resource_name
: str = None,
1808 """Get the replica count value in the instance.
1811 kdu_instance: The name of the KDU instance
1812 namespace: KDU instance namespace
1814 resource_name: Resource name
1817 True if replicas, False replicaCount
1820 kdu_values
= yaml
.load(
1821 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1822 Loader
=yaml
.SafeLoader
,
1829 kdu_values
.get(resource_name
, None) if resource_name
else None
1833 resource_values
.get("replicaCount", None)
1834 or resource_values
.get("replicas", None)
1838 kdu_values
.get("replicaCount", None)
1839 or kdu_values
.get("replicas", None)
1845 async def _store_status(
1850 namespace
: str = None,
1851 db_dict
: dict = None,
1854 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1856 :param cluster_id (str): the cluster where the KDU instance is deployed
1857 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1858 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1859 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1860 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1861 values for the keys:
1862 - "collection": The Mongo DB collection to write to
1863 - "filter": The query filter to use in the update process
1864 - "path": The dot separated keys which targets the object to be updated
1869 detailed_status
= await self
._status
_kdu
(
1870 cluster_id
=cluster_id
,
1871 kdu_instance
=kdu_instance
,
1873 namespace
=namespace
,
1876 status
= detailed_status
.get("info").get("description")
1877 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1879 # write status to db
1880 result
= await self
.write_app_status_to_db(
1883 detailed_status
=str(detailed_status
),
1884 operation
=operation
,
1888 self
.log
.info("Error writing in database. Task exiting...")
1890 except asyncio
.CancelledError
as e
:
1892 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1894 except Exception as e
:
1895 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1897 # params for use in -f file
1898 # returns values file option and filename (in order to delete it at the end)
1899 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1901 if params
and len(params
) > 0:
1902 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1904 def get_random_number():
1905 r
= random
.randrange(start
=1, stop
=99999999)
1913 value
= params
.get(key
)
1914 if "!!yaml" in str(value
):
1915 value
= yaml
.safe_load(value
[7:])
1916 params2
[key
] = value
1918 values_file
= get_random_number() + ".yaml"
1919 with
open(values_file
, "w") as stream
:
1920 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1922 return "-f {}".format(values_file
), values_file
1926 # params for use in --set option
1928 def _params_to_set_option(params
: dict) -> str:
1930 if params
and len(params
) > 0:
1933 value
= params
.get(key
, None)
1934 if value
is not None:
1936 params_str
+= "--set "
1940 params_str
+= "{}={}".format(key
, value
)
1944 def generate_kdu_instance_name(**kwargs
):
1945 chart_name
= kwargs
["kdu_model"]
1946 # check embeded chart (file or dir)
1947 if chart_name
.startswith("/"):
1948 # extract file or directory name
1949 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1951 elif "://" in chart_name
:
1952 # extract last portion of URL
1953 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1956 for c
in chart_name
:
1957 if c
.isalpha() or c
.isnumeric():
1964 # if does not start with alpha character, prefix 'a'
1965 if not name
[0].isalpha():
1970 def get_random_number():
1971 r
= random
.randrange(start
=1, stop
=99999999)
1973 s
= s
.rjust(10, "0")
1976 name
= name
+ get_random_number()
1979 def _split_version(self
, kdu_model
: str) -> (str, str):
1981 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
1982 parts
= kdu_model
.split(sep
=":")
1984 version
= str(parts
[1])
1985 kdu_model
= parts
[0]
1986 return kdu_model
, version
1988 async def _split_repo(self
, kdu_model
: str) -> str:
1990 idx
= kdu_model
.find("/")
1992 repo_name
= kdu_model
[:idx
]
1995 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1996 """Obtain the Helm repository for an Helm Chart
1999 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2000 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2003 str: the repository URL; if Helm Chart is a local one, the function returns None
2007 idx
= kdu_model
.find("/")
2009 repo_name
= kdu_model
[:idx
]
2010 # Find repository link
2011 local_repo_list
= await self
.repo_list(cluster_uuid
)
2012 for repo
in local_repo_list
:
2013 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None