2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 def _is_helm_chart_a_file(self
, chart_name
: str):
375 return chart_name
.count("/") > 1
377 async def _install_impl(
385 timeout
: float = 300,
387 db_dict
: dict = None,
388 kdu_name
: str = None,
389 namespace
: str = None,
392 paths
, env
= self
._init
_paths
_env
(
393 cluster_name
=cluster_id
, create_if_not_exist
=True
397 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
398 cluster_id
=cluster_id
, params
=params
402 kdu_model
, version
= self
._split
_version
(kdu_model
)
404 repo
= self
._split
_repo
(kdu_model
)
406 self
.repo_update(cluster_id
, repo
)
408 command
= self
._get
_install
_command
(
416 paths
["kube_config"],
419 self
.log
.debug("installing: {}".format(command
))
422 # exec helm in a task
423 exec_task
= asyncio
.ensure_future(
424 coro_or_future
=self
._local
_async
_exec
(
425 command
=command
, raise_exception_on_error
=False, env
=env
429 # write status in another task
430 status_task
= asyncio
.ensure_future(
431 coro_or_future
=self
._store
_status
(
432 cluster_id
=cluster_id
,
433 kdu_instance
=kdu_instance
,
441 # wait for execution task
442 await asyncio
.wait([exec_task
])
447 output
, rc
= exec_task
.result()
451 output
, rc
= await self
._local
_async
_exec
(
452 command
=command
, raise_exception_on_error
=False, env
=env
455 # remove temporal values yaml file
457 os
.remove(file_to_delete
)
460 await self
._store
_status
(
461 cluster_id
=cluster_id
,
462 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
485 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
488 self
.fs
.sync(from_path
=cluster_uuid
)
490 # look for instance to obtain namespace
491 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
492 if not instance_info
:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
496 paths
, env
= self
._init
_paths
_env
(
497 cluster_name
=cluster_uuid
, create_if_not_exist
=True
501 self
.fs
.sync(from_path
=cluster_uuid
)
504 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
505 cluster_id
=cluster_uuid
, params
=params
509 kdu_model
, version
= self
._split
_version
(kdu_model
)
511 repo
= self
._split
_repo
(kdu_model
)
513 self
.repo_update(cluster_uuid
, repo
)
515 command
= self
._get
_upgrade
_command
(
518 instance_info
["namespace"],
523 paths
["kube_config"],
526 self
.log
.debug("upgrading: {}".format(command
))
530 # exec helm in a task
531 exec_task
= asyncio
.ensure_future(
532 coro_or_future
=self
._local
_async
_exec
(
533 command
=command
, raise_exception_on_error
=False, env
=env
536 # write status in another task
537 status_task
= asyncio
.ensure_future(
538 coro_or_future
=self
._store
_status
(
539 cluster_id
=cluster_uuid
,
540 kdu_instance
=kdu_instance
,
541 namespace
=instance_info
["namespace"],
548 # wait for execution task
549 await asyncio
.wait([exec_task
])
553 output
, rc
= exec_task
.result()
557 output
, rc
= await self
._local
_async
_exec
(
558 command
=command
, raise_exception_on_error
=False, env
=env
561 # remove temporal values yaml file
563 os
.remove(file_to_delete
)
566 await self
._store
_status
(
567 cluster_id
=cluster_uuid
,
568 kdu_instance
=kdu_instance
,
569 namespace
=instance_info
["namespace"],
577 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
579 raise K8sException(msg
)
582 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
584 # return new revision number
585 instance
= await self
.get_instance_info(
586 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
589 revision
= int(instance
.get("revision"))
590 self
.log
.debug("New revision: {}".format(revision
))
600 total_timeout
: float = 1800,
601 cluster_uuid
: str = None,
602 kdu_model
: str = None,
604 db_dict
: dict = None,
607 """Scale a resource in a Helm Chart.
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
622 True if successful, False otherwise
625 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
627 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
628 resource_name
, kdu_model
, cluster_uuid
631 self
.log
.debug(debug_mgs
)
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
636 if not instance_info
:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
640 paths
, env
= self
._init
_paths
_env
(
641 cluster_name
=cluster_uuid
, create_if_not_exist
=True
645 kdu_model
, version
= self
._split
_version
(kdu_model
)
647 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
649 _
, replica_str
= await self
._get
_replica
_count
_url
(
650 kdu_model
, repo_url
, resource_name
653 command
= self
._get
_upgrade
_scale
_command
(
656 instance_info
["namespace"],
663 paths
["kube_config"],
666 self
.log
.debug("scaling: {}".format(command
))
669 # exec helm in a task
670 exec_task
= asyncio
.ensure_future(
671 coro_or_future
=self
._local
_async
_exec
(
672 command
=command
, raise_exception_on_error
=False, env
=env
675 # write status in another task
676 status_task
= asyncio
.ensure_future(
677 coro_or_future
=self
._store
_status
(
678 cluster_id
=cluster_uuid
,
679 kdu_instance
=kdu_instance
,
680 namespace
=instance_info
["namespace"],
687 # wait for execution task
688 await asyncio
.wait([exec_task
])
692 output
, rc
= exec_task
.result()
695 output
, rc
= await self
._local
_async
_exec
(
696 command
=command
, raise_exception_on_error
=False, env
=env
700 await self
._store
_status
(
701 cluster_id
=cluster_uuid
,
702 kdu_instance
=kdu_instance
,
703 namespace
=instance_info
["namespace"],
711 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
713 raise K8sException(msg
)
716 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
720 async def get_scale_count(
728 """Get a resource scale count.
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
734 kdu_model: The name or path of an Helm Chart
735 kwargs: Additional parameters
738 Resource instance count
742 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
745 # look for instance to obtain namespace
746 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
747 if not instance_info
:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
751 paths
, env
= self
._init
_paths
_env
(
752 cluster_name
=cluster_uuid
, create_if_not_exist
=True
755 replicas
= await self
._get
_replica
_count
_instance
(
756 kdu_instance
=kdu_instance
,
757 namespace
=instance_info
["namespace"],
758 kubeconfig
=paths
["kube_config"],
759 resource_name
=resource_name
,
762 # Get default value if scale count is not found from provided values
764 repo_url
= await self
._find
_repo
(
765 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
767 replicas
, _
= await self
._get
_replica
_count
_url
(
768 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
772 msg
= "Replica count not found. Cannot be scaled"
774 raise K8sException(msg
)
779 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
782 "rollback kdu_instance {} to revision {} from cluster {}".format(
783 kdu_instance
, revision
, cluster_uuid
788 self
.fs
.sync(from_path
=cluster_uuid
)
790 # look for instance to obtain namespace
791 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
792 if not instance_info
:
793 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
796 paths
, env
= self
._init
_paths
_env
(
797 cluster_name
=cluster_uuid
, create_if_not_exist
=True
801 self
.fs
.sync(from_path
=cluster_uuid
)
803 command
= self
._get
_rollback
_command
(
804 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
807 self
.log
.debug("rolling_back: {}".format(command
))
809 # exec helm in a task
810 exec_task
= asyncio
.ensure_future(
811 coro_or_future
=self
._local
_async
_exec
(
812 command
=command
, raise_exception_on_error
=False, env
=env
815 # write status in another task
816 status_task
= asyncio
.ensure_future(
817 coro_or_future
=self
._store
_status
(
818 cluster_id
=cluster_uuid
,
819 kdu_instance
=kdu_instance
,
820 namespace
=instance_info
["namespace"],
822 operation
="rollback",
827 # wait for execution task
828 await asyncio
.wait([exec_task
])
833 output
, rc
= exec_task
.result()
836 await self
._store
_status
(
837 cluster_id
=cluster_uuid
,
838 kdu_instance
=kdu_instance
,
839 namespace
=instance_info
["namespace"],
841 operation
="rollback",
847 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
849 raise K8sException(msg
)
852 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
854 # return new revision number
855 instance
= await self
.get_instance_info(
856 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
859 revision
= int(instance
.get("revision"))
860 self
.log
.debug("New revision: {}".format(revision
))
865 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
867 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
868 (this call should happen after all _terminate-config-primitive_ of the VNF
871 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
872 :param kdu_instance: unique name for the KDU instance to be deleted
873 :param kwargs: Additional parameters (None yet)
874 :return: True if successful
878 "uninstall kdu_instance {} from cluster {}".format(
879 kdu_instance
, cluster_uuid
884 self
.fs
.sync(from_path
=cluster_uuid
)
886 # look for instance to obtain namespace
887 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
888 if not instance_info
:
889 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
892 paths
, env
= self
._init
_paths
_env
(
893 cluster_name
=cluster_uuid
, create_if_not_exist
=True
897 self
.fs
.sync(from_path
=cluster_uuid
)
899 command
= self
._get
_uninstall
_command
(
900 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
902 output
, _rc
= await self
._local
_async
_exec
(
903 command
=command
, raise_exception_on_error
=True, env
=env
907 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
909 return self
._output
_to
_table
(output
)
911 async def instances_list(self
, cluster_uuid
: str) -> list:
913 returns a list of deployed releases in a cluster
915 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
919 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
922 self
.fs
.sync(from_path
=cluster_uuid
)
924 # execute internal command
925 result
= await self
._instances
_list
(cluster_uuid
)
928 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
932 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
933 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
934 for instance
in instances
:
935 if instance
.get("name") == kdu_instance
:
937 self
.log
.debug("Instance {} not found".format(kdu_instance
))
940 async def upgrade_charm(
944 charm_id
: str = None,
945 charm_type
: str = None,
946 timeout
: float = None,
948 """This method upgrade charms in VNFs
951 ee_id: Execution environment id
952 path: Local path to the charm
954 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
955 timeout: (Float) Timeout for the ns update operation
958 The output of the update operation if status equals to "completed"
960 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
962 async def exec_primitive(
964 cluster_uuid
: str = None,
965 kdu_instance
: str = None,
966 primitive_name
: str = None,
967 timeout
: float = 300,
969 db_dict
: dict = None,
972 """Exec primitive (Juju action)
974 :param cluster_uuid: The UUID of the cluster or namespace:cluster
975 :param kdu_instance: The unique name of the KDU instance
976 :param primitive_name: Name of action that will be executed
977 :param timeout: Timeout for action execution
978 :param params: Dictionary of all the parameters needed for the action
979 :db_dict: Dictionary for any additional data
980 :param kwargs: Additional parameters (None yet)
982 :return: Returns the output of the action
985 "KDUs deployed with Helm don't support actions "
986 "different from rollback, upgrade and status"
989 async def get_services(
990 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
993 Returns a list of services defined for the specified kdu instance.
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param namespace: K8s namespace used by the KDU instance
998 :return: If successful, it will return a list of services, Each service
999 can have the following data:
1000 - `name` of the service
1001 - `type` type of service in the k8 cluster
1002 - `ports` List of ports offered by the service, for each port includes at least
1003 name, port, protocol
1004 - `cluster_ip` Internal ip to be used inside k8s cluster
1005 - `external_ip` List of external ips (in case they are available)
1009 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1010 cluster_uuid
, kdu_instance
1015 paths
, env
= self
._init
_paths
_env
(
1016 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1020 self
.fs
.sync(from_path
=cluster_uuid
)
1022 # get list of services names for kdu
1023 service_names
= await self
._get
_services
(
1024 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1028 for service
in service_names
:
1029 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1030 service_list
.append(service
)
1033 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1037 async def get_service(
1038 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1042 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1043 service_name
, namespace
, cluster_uuid
1048 self
.fs
.sync(from_path
=cluster_uuid
)
1050 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1053 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1057 async def status_kdu(
1058 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1059 ) -> Union
[str, dict]:
1061 This call would retrieve tha current state of a given KDU instance. It would be
1062 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1063 values_ of the configuration parameters applied to a given instance. This call
1064 would be based on the `status` call.
1066 :param cluster_uuid: UUID of a K8s cluster known by OSM
1067 :param kdu_instance: unique name for the KDU instance
1068 :param kwargs: Additional parameters (None yet)
1069 :param yaml_format: if the return shall be returned as an YAML string or as a
1071 :return: If successful, it will return the following vector of arguments:
1072 - K8s `namespace` in the cluster where the KDU lives
1073 - `state` of the KDU instance. It can be:
1080 - List of `resources` (objects) that this release consists of, sorted by kind,
1081 and the status of those resources
1082 - Last `deployment_time`.
1086 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1087 cluster_uuid
, kdu_instance
1092 self
.fs
.sync(from_path
=cluster_uuid
)
1094 # get instance: needed to obtain namespace
1095 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1096 for instance
in instances
:
1097 if instance
.get("name") == kdu_instance
:
1100 # instance does not exist
1102 "Instance name: {} not found in cluster: {}".format(
1103 kdu_instance
, cluster_uuid
1107 status
= await self
._status
_kdu
(
1108 cluster_id
=cluster_uuid
,
1109 kdu_instance
=kdu_instance
,
1110 namespace
=instance
["namespace"],
1111 yaml_format
=yaml_format
,
1112 show_error_log
=True,
1116 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1120 async def get_values_kdu(
1121 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1124 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1126 return await self
._exec
_get
_command
(
1127 get_command
="values",
1128 kdu_instance
=kdu_instance
,
1129 namespace
=namespace
,
1130 kubeconfig
=kubeconfig
,
1133 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1134 """Method to obtain the Helm Chart package's values
1137 kdu_model: The name or path of an Helm Chart
1138 repo_url: Helm Chart repository url
1141 str: the values of the Helm Chart package
1145 "inspect kdu_model values {} from (optional) repo: {}".format(
1150 return await self
._exec
_inspect
_command
(
1151 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1154 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1157 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1160 return await self
._exec
_inspect
_command
(
1161 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1164 async def synchronize_repos(self
, cluster_uuid
: str):
1166 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1168 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1169 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1171 local_repo_list
= await self
.repo_list(cluster_uuid
)
1172 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1174 deleted_repo_list
= []
1175 added_repo_dict
= {}
1177 # iterate over the list of repos in the database that should be
1178 # added if not present
1179 for repo_name
, db_repo
in db_repo_dict
.items():
1181 # check if it is already present
1182 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1183 repo_id
= db_repo
.get("_id")
1184 if curr_repo_url
!= db_repo
["url"]:
1187 "repo {} url changed, delete and and again".format(
1191 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1192 deleted_repo_list
.append(repo_id
)
1195 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1196 if "ca_cert" in db_repo
:
1197 await self
.repo_add(
1201 cert
=db_repo
["ca_cert"],
1204 await self
.repo_add(
1209 added_repo_dict
[repo_id
] = db_repo
["name"]
1210 except Exception as e
:
1212 "Error adding repo id: {}, err_msg: {} ".format(
1217 # Delete repos that are present but not in nbi_list
1218 for repo_name
in local_repo_dict
:
1219 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1220 self
.log
.debug("delete repo {}".format(repo_name
))
1222 await self
.repo_remove(cluster_uuid
, repo_name
)
1223 deleted_repo_list
.append(repo_name
)
1224 except Exception as e
:
1226 "Error deleting repo, name: {}, err_msg: {}".format(
1231 return deleted_repo_list
, added_repo_dict
1233 except K8sException
:
1235 except Exception as e
:
1236 # Do not raise errors synchronizing repos
1237 self
.log
.error("Error synchronizing repos: {}".format(e
))
1238 raise Exception("Error synchronizing repos: {}".format(e
))
1240 def _get_db_repos_dict(self
, repo_ids
: list):
1242 for repo_id
in repo_ids
:
1243 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1244 db_repos_dict
[db_repo
["name"]] = db_repo
1245 return db_repos_dict
1248 ####################################################################################
1249 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1250 ####################################################################################
1254 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1256 Creates and returns base cluster and kube dirs and returns them.
1257 Also created helm3 dirs according to new directory specification, paths are
1258 not returned but assigned to helm environment variables
1260 :param cluster_name: cluster_name
1261 :return: Dictionary with config_paths and dictionary with helm environment variables
1265 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1267 Implements the helm version dependent cluster initialization
1271 async def _instances_list(self
, cluster_id
):
1273 Implements the helm version dependent helm instances list
1277 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1279 Implements the helm version dependent method to obtain services from a helm instance
1283 async def _status_kdu(
1287 namespace
: str = None,
1288 yaml_format
: bool = False,
1289 show_error_log
: bool = False,
1290 ) -> Union
[str, dict]:
1292 Implements the helm version dependent method to obtain status of a helm instance
1296 def _get_install_command(
1308 Obtain command to be executed to delete the indicated instance
1312 def _get_upgrade_scale_command(
1325 """Obtain command to be executed to upgrade the indicated instance."""
1328 def _get_upgrade_command(
1340 Obtain command to be executed to upgrade the indicated instance
1344 def _get_rollback_command(
1345 self
, kdu_instance
, namespace
, revision
, kubeconfig
1348 Obtain command to be executed to rollback the indicated instance
1352 def _get_uninstall_command(
1353 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1356 Obtain command to be executed to delete the indicated instance
1360 def _get_inspect_command(
1361 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1363 """Generates the command to obtain the information about an Helm Chart package
1364 (´helm show ...´ command)
1367 show_command: the second part of the command (`helm show <show_command>`)
1368 kdu_model: The name or path of an Helm Chart
1369 repo_url: Helm Chart repository url
1370 version: constraint with specific version of the Chart to use
1373 str: the generated Helm Chart command
1377 def _get_get_command(
1378 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1380 """Obtain command to be executed to get information about the kdu instance."""
1383 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1385 Method call to uninstall cluster software for helm. This method is dependent
1387 For Helm v2 it will be called when Tiller must be uninstalled
1388 For Helm v3 it does nothing and does not need to be callled
1392 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1394 Obtains the cluster repos identifiers
1398 ####################################################################################
1399 ################################### P R I V A T E ##################################
1400 ####################################################################################
1404 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1405 if os
.path
.exists(filename
):
1408 msg
= "File {} does not exist".format(filename
)
1409 if exception_if_not_exists
:
1410 raise K8sException(msg
)
1413 def _remove_multiple_spaces(strobj
):
1414 strobj
= strobj
.strip()
1415 while " " in strobj
:
1416 strobj
= strobj
.replace(" ", " ")
1420 def _output_to_lines(output
: str) -> list:
1421 output_lines
= list()
1422 lines
= output
.splitlines(keepends
=False)
1426 output_lines
.append(line
)
1430 def _output_to_table(output
: str) -> list:
1431 output_table
= list()
1432 lines
= output
.splitlines(keepends
=False)
1434 line
= line
.replace("\t", " ")
1436 output_table
.append(line_list
)
1437 cells
= line
.split(sep
=" ")
1441 line_list
.append(cell
)
1445 def _parse_services(output
: str) -> list:
1446 lines
= output
.splitlines(keepends
=False)
1449 line
= line
.replace("\t", " ")
1450 cells
= line
.split(sep
=" ")
1451 if len(cells
) > 0 and cells
[0].startswith("service/"):
1452 elems
= cells
[0].split(sep
="/")
1454 services
.append(elems
[1])
1458 def _get_deep(dictionary
: dict, members
: tuple):
1463 value
= target
.get(m
)
1472 # find key:value in several lines
1474 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1475 for line
in p_lines
:
1477 if line
.startswith(p_key
+ ":"):
1478 parts
= line
.split(":")
1479 the_value
= parts
[1].strip()
1487 def _lower_keys_list(input_list
: list):
1489 Transform the keys in a list of dictionaries to lower case and returns a new list
1494 for dictionary
in input_list
:
1495 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1496 new_list
.append(new_dict
)
1499 async def _local_async_exec(
1502 raise_exception_on_error
: bool = False,
1503 show_error_log
: bool = True,
1504 encode_utf8
: bool = False,
1508 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1510 "Executing async local command: {}, env: {}".format(command
, env
)
1514 command
= shlex
.split(command
)
1516 environ
= os
.environ
.copy()
1521 process
= await asyncio
.create_subprocess_exec(
1523 stdout
=asyncio
.subprocess
.PIPE
,
1524 stderr
=asyncio
.subprocess
.PIPE
,
1528 # wait for command terminate
1529 stdout
, stderr
= await process
.communicate()
1531 return_code
= process
.returncode
1535 output
= stdout
.decode("utf-8").strip()
1536 # output = stdout.decode()
1538 output
= stderr
.decode("utf-8").strip()
1539 # output = stderr.decode()
1541 if return_code
!= 0 and show_error_log
:
1543 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1546 self
.log
.debug("Return code: {}".format(return_code
))
1548 if raise_exception_on_error
and return_code
!= 0:
1549 raise K8sException(output
)
1552 output
= output
.encode("utf-8").strip()
1553 output
= str(output
).replace("\\n", "\n")
1555 return output
, return_code
1557 except asyncio
.CancelledError
:
1559 except K8sException
:
1561 except Exception as e
:
1562 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1564 if raise_exception_on_error
:
1565 raise K8sException(e
) from e
1569 async def _local_async_exec_pipe(
1573 raise_exception_on_error
: bool = True,
1574 show_error_log
: bool = True,
1575 encode_utf8
: bool = False,
1579 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1580 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1581 command
= "{} | {}".format(command1
, command2
)
1583 "Executing async local command: {}, env: {}".format(command
, env
)
1587 command1
= shlex
.split(command1
)
1588 command2
= shlex
.split(command2
)
1590 environ
= os
.environ
.copy()
1595 read
, write
= os
.pipe()
1596 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1598 process_2
= await asyncio
.create_subprocess_exec(
1599 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1602 stdout
, stderr
= await process_2
.communicate()
1604 return_code
= process_2
.returncode
1608 output
= stdout
.decode("utf-8").strip()
1609 # output = stdout.decode()
1611 output
= stderr
.decode("utf-8").strip()
1612 # output = stderr.decode()
1614 if return_code
!= 0 and show_error_log
:
1616 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1619 self
.log
.debug("Return code: {}".format(return_code
))
1621 if raise_exception_on_error
and return_code
!= 0:
1622 raise K8sException(output
)
1625 output
= output
.encode("utf-8").strip()
1626 output
= str(output
).replace("\\n", "\n")
1628 return output
, return_code
1629 except asyncio
.CancelledError
:
1631 except K8sException
:
1633 except Exception as e
:
1634 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1636 if raise_exception_on_error
:
1637 raise K8sException(e
) from e
1641 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1643 Obtains the data of the specified service in the k8cluster.
1645 :param cluster_id: id of a K8s cluster known by OSM
1646 :param service_name: name of the K8s service in the specified namespace
1647 :param namespace: K8s namespace used by the KDU instance
1648 :return: If successful, it will return a service with the following data:
1649 - `name` of the service
1650 - `type` type of service in the k8 cluster
1651 - `ports` List of ports offered by the service, for each port includes at least
1652 name, port, protocol
1653 - `cluster_ip` Internal ip to be used inside k8s cluster
1654 - `external_ip` List of external ips (in case they are available)
1658 paths
, env
= self
._init
_paths
_env
(
1659 cluster_name
=cluster_id
, create_if_not_exist
=True
1662 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1663 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1666 output
, _rc
= await self
._local
_async
_exec
(
1667 command
=command
, raise_exception_on_error
=True, env
=env
1670 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1673 "name": service_name
,
1674 "type": self
._get
_deep
(data
, ("spec", "type")),
1675 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1676 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1678 if service
["type"] == "LoadBalancer":
1679 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1680 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1681 service
["external_ip"] = ip_list
1685 async def _exec_get_command(
1686 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1688 """Obtains information about the kdu instance."""
1690 full_command
= self
._get
_get
_command
(
1691 get_command
, kdu_instance
, namespace
, kubeconfig
1694 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1698 async def _exec_inspect_command(
1699 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1701 """Obtains information about an Helm Chart package (´helm show´ command)
1704 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1705 kdu_model: The name or path of an Helm Chart
1706 repo_url: Helm Chart repository url
1709 str: the requested info about the Helm Chart package
1714 repo_str
= " --repo {}".format(repo_url
)
1716 idx
= kdu_model
.find("/")
1719 kdu_model
= kdu_model
[idx
:]
1721 kdu_model
, version
= self
._split
_version
(kdu_model
)
1723 version_str
= "--version {}".format(version
)
1727 full_command
= self
._get
_inspect
_command
(
1728 inspect_command
, kdu_model
, repo_str
, version_str
1731 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1735 async def _get_replica_count_url(
1738 repo_url
: str = None,
1739 resource_name
: str = None,
1741 """Get the replica count value in the Helm Chart Values.
1744 kdu_model: The name or path of an Helm Chart
1745 repo_url: Helm Chart repository url
1746 resource_name: Resource name
1749 True if replicas, False replicaCount
1752 kdu_values
= yaml
.load(
1753 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1754 Loader
=yaml
.SafeLoader
,
1759 "kdu_values not found for kdu_model {}".format(kdu_model
)
1763 kdu_values
= kdu_values
.get(resource_name
, None)
1766 msg
= "resource {} not found in the values in model {}".format(
1767 resource_name
, kdu_model
1770 raise K8sException(msg
)
1772 duplicate_check
= False
1777 if kdu_values
.get("replicaCount", None):
1778 replicas
= kdu_values
["replicaCount"]
1779 replica_str
= "replicaCount"
1780 elif kdu_values
.get("replicas", None):
1781 duplicate_check
= True
1782 replicas
= kdu_values
["replicas"]
1783 replica_str
= "replicas"
1787 "replicaCount or replicas not found in the resource"
1788 "{} values in model {}. Cannot be scaled".format(
1789 resource_name
, kdu_model
1794 "replicaCount or replicas not found in the values"
1795 "in model {}. Cannot be scaled".format(kdu_model
)
1798 raise K8sException(msg
)
1800 # Control if replicas and replicaCount exists at the same time
1801 msg
= "replicaCount and replicas are exists at the same time"
1803 if "replicaCount" in kdu_values
:
1805 raise K8sException(msg
)
1807 if "replicas" in kdu_values
:
1809 raise K8sException(msg
)
1811 return replicas
, replica_str
1813 async def _get_replica_count_instance(
1818 resource_name
: str = None,
1820 """Get the replica count value in the instance.
1823 kdu_instance: The name of the KDU instance
1824 namespace: KDU instance namespace
1826 resource_name: Resource name
1829 True if replicas, False replicaCount
1832 kdu_values
= yaml
.load(
1833 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1834 Loader
=yaml
.SafeLoader
,
1841 kdu_values
.get(resource_name
, None) if resource_name
else None
1845 resource_values
.get("replicaCount", None)
1846 or resource_values
.get("replicas", None)
1850 kdu_values
.get("replicaCount", None)
1851 or kdu_values
.get("replicas", None)
1857 async def _store_status(
1862 namespace
: str = None,
1863 check_every
: float = 10,
1864 db_dict
: dict = None,
1865 run_once
: bool = False,
1869 await asyncio
.sleep(check_every
)
1870 detailed_status
= await self
._status
_kdu
(
1871 cluster_id
=cluster_id
,
1872 kdu_instance
=kdu_instance
,
1874 namespace
=namespace
,
1876 status
= detailed_status
.get("info").get("description")
1877 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1878 # write status to db
1879 result
= await self
.write_app_status_to_db(
1882 detailed_status
=str(detailed_status
),
1883 operation
=operation
,
1886 self
.log
.info("Error writing in database. Task exiting...")
1888 except asyncio
.CancelledError
:
1889 self
.log
.debug("Task cancelled")
1891 except Exception as e
:
1893 "_store_status exception: {}".format(str(e
)), exc_info
=True
1900 # params for use in -f file
1901 # returns values file option and filename (in order to delete it at the end)
1902 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1904 if params
and len(params
) > 0:
1905 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1907 def get_random_number():
1908 r
= random
.randrange(start
=1, stop
=99999999)
1916 value
= params
.get(key
)
1917 if "!!yaml" in str(value
):
1918 value
= yaml
.safe_load(value
[7:])
1919 params2
[key
] = value
1921 values_file
= get_random_number() + ".yaml"
1922 with
open(values_file
, "w") as stream
:
1923 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1925 return "-f {}".format(values_file
), values_file
1929 # params for use in --set option
1931 def _params_to_set_option(params
: dict) -> str:
1933 if params
and len(params
) > 0:
1936 value
= params
.get(key
, None)
1937 if value
is not None:
1939 params_str
+= "--set "
1943 params_str
+= "{}={}".format(key
, value
)
1947 def generate_kdu_instance_name(**kwargs
):
1948 chart_name
= kwargs
["kdu_model"]
1949 # check embeded chart (file or dir)
1950 if chart_name
.startswith("/"):
1951 # extract file or directory name
1952 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1954 elif "://" in chart_name
:
1955 # extract last portion of URL
1956 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1959 for c
in chart_name
:
1960 if c
.isalpha() or c
.isnumeric():
1967 # if does not start with alpha character, prefix 'a'
1968 if not name
[0].isalpha():
1973 def get_random_number():
1974 r
= random
.randrange(start
=1, stop
=99999999)
1976 s
= s
.rjust(10, "0")
1979 name
= name
+ get_random_number()
1982 def _split_version(self
, kdu_model
: str) -> (str, str):
1984 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
1985 parts
= kdu_model
.split(sep
=":")
1987 version
= str(parts
[1])
1988 kdu_model
= parts
[0]
1989 return kdu_model
, version
1991 async def _split_repo(self
, kdu_model
: str) -> str:
1993 idx
= kdu_model
.find("/")
1995 repo_name
= kdu_model
[:idx
]
1998 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1999 """Obtain the Helm repository for an Helm Chart
2002 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2003 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2006 str: the repository URL; if Helm Chart is a local one, the function returns None
2010 idx
= kdu_model
.find("/")
2012 repo_name
= kdu_model
[:idx
]
2013 # Find repository link
2014 local_repo_list
= await self
.repo_list(cluster_uuid
)
2015 for repo
in local_repo_list
:
2016 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None