2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 def _is_helm_chart_a_file(self
, chart_name
: str):
375 return chart_name
.count("/") > 1
377 async def _install_impl(
385 timeout
: float = 300,
387 db_dict
: dict = None,
388 kdu_name
: str = None,
389 namespace
: str = None,
392 paths
, env
= self
._init
_paths
_env
(
393 cluster_name
=cluster_id
, create_if_not_exist
=True
397 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
398 cluster_id
=cluster_id
, params
=params
402 kdu_model
, version
= self
._split
_version
(kdu_model
)
404 repo
= self
._split
_repo
(kdu_model
)
406 self
.repo_update(cluster_id
, repo
)
408 command
= self
._get
_install
_command
(
416 paths
["kube_config"],
419 self
.log
.debug("installing: {}".format(command
))
422 # exec helm in a task
423 exec_task
= asyncio
.ensure_future(
424 coro_or_future
=self
._local
_async
_exec
(
425 command
=command
, raise_exception_on_error
=False, env
=env
429 # write status in another task
430 status_task
= asyncio
.ensure_future(
431 coro_or_future
=self
._store
_status
(
432 cluster_id
=cluster_id
,
433 kdu_instance
=kdu_instance
,
441 # wait for execution task
442 await asyncio
.wait([exec_task
])
447 output
, rc
= exec_task
.result()
451 output
, rc
= await self
._local
_async
_exec
(
452 command
=command
, raise_exception_on_error
=False, env
=env
455 # remove temporal values yaml file
457 os
.remove(file_to_delete
)
460 await self
._store
_status
(
461 cluster_id
=cluster_id
,
462 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
485 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
488 self
.fs
.sync(from_path
=cluster_uuid
)
490 # look for instance to obtain namespace
491 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
492 if not instance_info
:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
496 paths
, env
= self
._init
_paths
_env
(
497 cluster_name
=cluster_uuid
, create_if_not_exist
=True
501 self
.fs
.sync(from_path
=cluster_uuid
)
504 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
505 cluster_id
=cluster_uuid
, params
=params
509 kdu_model
, version
= self
._split
_version
(kdu_model
)
511 repo
= self
._split
_repo
(kdu_model
)
513 self
.repo_update(cluster_uuid
, repo
)
515 command
= self
._get
_upgrade
_command
(
518 instance_info
["namespace"],
523 paths
["kube_config"],
526 self
.log
.debug("upgrading: {}".format(command
))
530 # exec helm in a task
531 exec_task
= asyncio
.ensure_future(
532 coro_or_future
=self
._local
_async
_exec
(
533 command
=command
, raise_exception_on_error
=False, env
=env
536 # write status in another task
537 status_task
= asyncio
.ensure_future(
538 coro_or_future
=self
._store
_status
(
539 cluster_id
=cluster_uuid
,
540 kdu_instance
=kdu_instance
,
541 namespace
=instance_info
["namespace"],
548 # wait for execution task
549 await asyncio
.wait([exec_task
])
553 output
, rc
= exec_task
.result()
557 output
, rc
= await self
._local
_async
_exec
(
558 command
=command
, raise_exception_on_error
=False, env
=env
561 # remove temporal values yaml file
563 os
.remove(file_to_delete
)
566 await self
._store
_status
(
567 cluster_id
=cluster_uuid
,
568 kdu_instance
=kdu_instance
,
569 namespace
=instance_info
["namespace"],
577 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
579 raise K8sException(msg
)
582 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
584 # return new revision number
585 instance
= await self
.get_instance_info(
586 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
589 revision
= int(instance
.get("revision"))
590 self
.log
.debug("New revision: {}".format(revision
))
600 total_timeout
: float = 1800,
601 cluster_uuid
: str = None,
602 kdu_model
: str = None,
604 db_dict
: dict = None,
607 """Scale a resource in a Helm Chart.
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
622 True if successful, False otherwise
625 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
627 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
628 resource_name
, kdu_model
, cluster_uuid
631 self
.log
.debug(debug_mgs
)
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
636 if not instance_info
:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
640 paths
, env
= self
._init
_paths
_env
(
641 cluster_name
=cluster_uuid
, create_if_not_exist
=True
645 kdu_model
, version
= self
._split
_version
(kdu_model
)
647 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
649 _
, replica_str
= await self
._get
_replica
_count
_url
(
650 kdu_model
, repo_url
, resource_name
653 command
= self
._get
_upgrade
_scale
_command
(
656 instance_info
["namespace"],
663 paths
["kube_config"],
666 self
.log
.debug("scaling: {}".format(command
))
669 # exec helm in a task
670 exec_task
= asyncio
.ensure_future(
671 coro_or_future
=self
._local
_async
_exec
(
672 command
=command
, raise_exception_on_error
=False, env
=env
675 # write status in another task
676 status_task
= asyncio
.ensure_future(
677 coro_or_future
=self
._store
_status
(
678 cluster_id
=cluster_uuid
,
679 kdu_instance
=kdu_instance
,
680 namespace
=instance_info
["namespace"],
687 # wait for execution task
688 await asyncio
.wait([exec_task
])
692 output
, rc
= exec_task
.result()
695 output
, rc
= await self
._local
_async
_exec
(
696 command
=command
, raise_exception_on_error
=False, env
=env
700 await self
._store
_status
(
701 cluster_id
=cluster_uuid
,
702 kdu_instance
=kdu_instance
,
703 namespace
=instance_info
["namespace"],
711 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
713 raise K8sException(msg
)
716 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
720 async def get_scale_count(
728 """Get a resource scale count.
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
734 kdu_model: The name or path of an Helm Chart
735 kwargs: Additional parameters
738 Resource instance count
742 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
745 # look for instance to obtain namespace
746 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
747 if not instance_info
:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
751 paths
, env
= self
._init
_paths
_env
(
752 cluster_name
=cluster_uuid
, create_if_not_exist
=True
755 replicas
= await self
._get
_replica
_count
_instance
(
756 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
759 # Get default value if scale count is not found from provided values
761 repo_url
= await self
._find
_repo
(
762 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
764 replicas
, _
= await self
._get
_replica
_count
_url
(
765 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
769 msg
= "Replica count not found. Cannot be scaled"
771 raise K8sException(msg
)
776 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
779 "rollback kdu_instance {} to revision {} from cluster {}".format(
780 kdu_instance
, revision
, cluster_uuid
785 self
.fs
.sync(from_path
=cluster_uuid
)
787 # look for instance to obtain namespace
788 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
789 if not instance_info
:
790 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
793 paths
, env
= self
._init
_paths
_env
(
794 cluster_name
=cluster_uuid
, create_if_not_exist
=True
798 self
.fs
.sync(from_path
=cluster_uuid
)
800 command
= self
._get
_rollback
_command
(
801 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
804 self
.log
.debug("rolling_back: {}".format(command
))
806 # exec helm in a task
807 exec_task
= asyncio
.ensure_future(
808 coro_or_future
=self
._local
_async
_exec
(
809 command
=command
, raise_exception_on_error
=False, env
=env
812 # write status in another task
813 status_task
= asyncio
.ensure_future(
814 coro_or_future
=self
._store
_status
(
815 cluster_id
=cluster_uuid
,
816 kdu_instance
=kdu_instance
,
817 namespace
=instance_info
["namespace"],
819 operation
="rollback",
824 # wait for execution task
825 await asyncio
.wait([exec_task
])
830 output
, rc
= exec_task
.result()
833 await self
._store
_status
(
834 cluster_id
=cluster_uuid
,
835 kdu_instance
=kdu_instance
,
836 namespace
=instance_info
["namespace"],
838 operation
="rollback",
844 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
846 raise K8sException(msg
)
849 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
851 # return new revision number
852 instance
= await self
.get_instance_info(
853 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
856 revision
= int(instance
.get("revision"))
857 self
.log
.debug("New revision: {}".format(revision
))
862 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
864 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
865 (this call should happen after all _terminate-config-primitive_ of the VNF
868 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
869 :param kdu_instance: unique name for the KDU instance to be deleted
870 :param kwargs: Additional parameters (None yet)
871 :return: True if successful
875 "uninstall kdu_instance {} from cluster {}".format(
876 kdu_instance
, cluster_uuid
881 self
.fs
.sync(from_path
=cluster_uuid
)
883 # look for instance to obtain namespace
884 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
885 if not instance_info
:
886 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
889 paths
, env
= self
._init
_paths
_env
(
890 cluster_name
=cluster_uuid
, create_if_not_exist
=True
894 self
.fs
.sync(from_path
=cluster_uuid
)
896 command
= self
._get
_uninstall
_command
(
897 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
899 output
, _rc
= await self
._local
_async
_exec
(
900 command
=command
, raise_exception_on_error
=True, env
=env
904 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
906 return self
._output
_to
_table
(output
)
908 async def instances_list(self
, cluster_uuid
: str) -> list:
910 returns a list of deployed releases in a cluster
912 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
916 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
919 self
.fs
.sync(from_path
=cluster_uuid
)
921 # execute internal command
922 result
= await self
._instances
_list
(cluster_uuid
)
925 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
929 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
930 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
931 for instance
in instances
:
932 if instance
.get("name") == kdu_instance
:
934 self
.log
.debug("Instance {} not found".format(kdu_instance
))
937 async def upgrade_charm(
941 charm_id
: str = None,
942 charm_type
: str = None,
943 timeout
: float = None,
945 """This method upgrade charms in VNFs
948 ee_id: Execution environment id
949 path: Local path to the charm
951 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
952 timeout: (Float) Timeout for the ns update operation
955 The output of the update operation if status equals to "completed"
957 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
959 async def exec_primitive(
961 cluster_uuid
: str = None,
962 kdu_instance
: str = None,
963 primitive_name
: str = None,
964 timeout
: float = 300,
966 db_dict
: dict = None,
969 """Exec primitive (Juju action)
971 :param cluster_uuid: The UUID of the cluster or namespace:cluster
972 :param kdu_instance: The unique name of the KDU instance
973 :param primitive_name: Name of action that will be executed
974 :param timeout: Timeout for action execution
975 :param params: Dictionary of all the parameters needed for the action
976 :db_dict: Dictionary for any additional data
977 :param kwargs: Additional parameters (None yet)
979 :return: Returns the output of the action
982 "KDUs deployed with Helm don't support actions "
983 "different from rollback, upgrade and status"
986 async def get_services(
987 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
990 Returns a list of services defined for the specified kdu instance.
992 :param cluster_uuid: UUID of a K8s cluster known by OSM
993 :param kdu_instance: unique name for the KDU instance
994 :param namespace: K8s namespace used by the KDU instance
995 :return: If successful, it will return a list of services, Each service
996 can have the following data:
997 - `name` of the service
998 - `type` type of service in the k8 cluster
999 - `ports` List of ports offered by the service, for each port includes at least
1000 name, port, protocol
1001 - `cluster_ip` Internal ip to be used inside k8s cluster
1002 - `external_ip` List of external ips (in case they are available)
1006 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1007 cluster_uuid
, kdu_instance
1012 paths
, env
= self
._init
_paths
_env
(
1013 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1017 self
.fs
.sync(from_path
=cluster_uuid
)
1019 # get list of services names for kdu
1020 service_names
= await self
._get
_services
(
1021 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1025 for service
in service_names
:
1026 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1027 service_list
.append(service
)
1030 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1034 async def get_service(
1035 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1039 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1040 service_name
, namespace
, cluster_uuid
1045 self
.fs
.sync(from_path
=cluster_uuid
)
1047 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1050 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1054 async def status_kdu(
1055 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1056 ) -> Union
[str, dict]:
1058 This call would retrieve tha current state of a given KDU instance. It would be
1059 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1060 values_ of the configuration parameters applied to a given instance. This call
1061 would be based on the `status` call.
1063 :param cluster_uuid: UUID of a K8s cluster known by OSM
1064 :param kdu_instance: unique name for the KDU instance
1065 :param kwargs: Additional parameters (None yet)
1066 :param yaml_format: if the return shall be returned as an YAML string or as a
1068 :return: If successful, it will return the following vector of arguments:
1069 - K8s `namespace` in the cluster where the KDU lives
1070 - `state` of the KDU instance. It can be:
1077 - List of `resources` (objects) that this release consists of, sorted by kind,
1078 and the status of those resources
1079 - Last `deployment_time`.
1083 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1084 cluster_uuid
, kdu_instance
1089 self
.fs
.sync(from_path
=cluster_uuid
)
1091 # get instance: needed to obtain namespace
1092 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1093 for instance
in instances
:
1094 if instance
.get("name") == kdu_instance
:
1097 # instance does not exist
1099 "Instance name: {} not found in cluster: {}".format(
1100 kdu_instance
, cluster_uuid
1104 status
= await self
._status
_kdu
(
1105 cluster_id
=cluster_uuid
,
1106 kdu_instance
=kdu_instance
,
1107 namespace
=instance
["namespace"],
1108 yaml_format
=yaml_format
,
1109 show_error_log
=True,
1113 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1117 async def get_values_kdu(
1118 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1121 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1123 return await self
._exec
_get
_command
(
1124 get_command
="values",
1125 kdu_instance
=kdu_instance
,
1126 namespace
=namespace
,
1127 kubeconfig
=kubeconfig
,
1130 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1131 """Method to obtain the Helm Chart package's values
1134 kdu_model: The name or path of an Helm Chart
1135 repo_url: Helm Chart repository url
1138 str: the values of the Helm Chart package
1142 "inspect kdu_model values {} from (optional) repo: {}".format(
1147 return await self
._exec
_inspect
_command
(
1148 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1151 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1154 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1157 return await self
._exec
_inspect
_command
(
1158 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1161 async def synchronize_repos(self
, cluster_uuid
: str):
1163 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1165 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1166 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1168 local_repo_list
= await self
.repo_list(cluster_uuid
)
1169 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1171 deleted_repo_list
= []
1172 added_repo_dict
= {}
1174 # iterate over the list of repos in the database that should be
1175 # added if not present
1176 for repo_name
, db_repo
in db_repo_dict
.items():
1178 # check if it is already present
1179 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1180 repo_id
= db_repo
.get("_id")
1181 if curr_repo_url
!= db_repo
["url"]:
1184 "repo {} url changed, delete and and again".format(
1188 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1189 deleted_repo_list
.append(repo_id
)
1192 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1193 if "ca_cert" in db_repo
:
1194 await self
.repo_add(
1198 cert
=db_repo
["ca_cert"],
1201 await self
.repo_add(
1206 added_repo_dict
[repo_id
] = db_repo
["name"]
1207 except Exception as e
:
1209 "Error adding repo id: {}, err_msg: {} ".format(
1214 # Delete repos that are present but not in nbi_list
1215 for repo_name
in local_repo_dict
:
1216 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1217 self
.log
.debug("delete repo {}".format(repo_name
))
1219 await self
.repo_remove(cluster_uuid
, repo_name
)
1220 deleted_repo_list
.append(repo_name
)
1221 except Exception as e
:
1223 "Error deleting repo, name: {}, err_msg: {}".format(
1228 return deleted_repo_list
, added_repo_dict
1230 except K8sException
:
1232 except Exception as e
:
1233 # Do not raise errors synchronizing repos
1234 self
.log
.error("Error synchronizing repos: {}".format(e
))
1235 raise Exception("Error synchronizing repos: {}".format(e
))
1237 def _get_db_repos_dict(self
, repo_ids
: list):
1239 for repo_id
in repo_ids
:
1240 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1241 db_repos_dict
[db_repo
["name"]] = db_repo
1242 return db_repos_dict
1245 ####################################################################################
1246 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1247 ####################################################################################
1251 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1253 Creates and returns base cluster and kube dirs and returns them.
1254 Also created helm3 dirs according to new directory specification, paths are
1255 not returned but assigned to helm environment variables
1257 :param cluster_name: cluster_name
1258 :return: Dictionary with config_paths and dictionary with helm environment variables
1262 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1264 Implements the helm version dependent cluster initialization
1268 async def _instances_list(self
, cluster_id
):
1270 Implements the helm version dependent helm instances list
1274 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1276 Implements the helm version dependent method to obtain services from a helm instance
1280 async def _status_kdu(
1284 namespace
: str = None,
1285 yaml_format
: bool = False,
1286 show_error_log
: bool = False,
1287 ) -> Union
[str, dict]:
1289 Implements the helm version dependent method to obtain status of a helm instance
1293 def _get_install_command(
1305 Obtain command to be executed to delete the indicated instance
1309 def _get_upgrade_scale_command(
1322 """Obtain command to be executed to upgrade the indicated instance."""
1325 def _get_upgrade_command(
1337 Obtain command to be executed to upgrade the indicated instance
1341 def _get_rollback_command(
1342 self
, kdu_instance
, namespace
, revision
, kubeconfig
1345 Obtain command to be executed to rollback the indicated instance
1349 def _get_uninstall_command(
1350 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1353 Obtain command to be executed to delete the indicated instance
1357 def _get_inspect_command(
1358 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1360 """Generates the command to obtain the information about an Helm Chart package
1361 (´helm show ...´ command)
1364 show_command: the second part of the command (`helm show <show_command>`)
1365 kdu_model: The name or path of an Helm Chart
1366 repo_url: Helm Chart repository url
1367 version: constraint with specific version of the Chart to use
1370 str: the generated Helm Chart command
1374 def _get_get_command(
1375 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1377 """Obtain command to be executed to get information about the kdu instance."""
1380 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1382 Method call to uninstall cluster software for helm. This method is dependent
1384 For Helm v2 it will be called when Tiller must be uninstalled
1385 For Helm v3 it does nothing and does not need to be callled
1389 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1391 Obtains the cluster repos identifiers
1395 ####################################################################################
1396 ################################### P R I V A T E ##################################
1397 ####################################################################################
1401 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1402 if os
.path
.exists(filename
):
1405 msg
= "File {} does not exist".format(filename
)
1406 if exception_if_not_exists
:
1407 raise K8sException(msg
)
1410 def _remove_multiple_spaces(strobj
):
1411 strobj
= strobj
.strip()
1412 while " " in strobj
:
1413 strobj
= strobj
.replace(" ", " ")
1417 def _output_to_lines(output
: str) -> list:
1418 output_lines
= list()
1419 lines
= output
.splitlines(keepends
=False)
1423 output_lines
.append(line
)
1427 def _output_to_table(output
: str) -> list:
1428 output_table
= list()
1429 lines
= output
.splitlines(keepends
=False)
1431 line
= line
.replace("\t", " ")
1433 output_table
.append(line_list
)
1434 cells
= line
.split(sep
=" ")
1438 line_list
.append(cell
)
1442 def _parse_services(output
: str) -> list:
1443 lines
= output
.splitlines(keepends
=False)
1446 line
= line
.replace("\t", " ")
1447 cells
= line
.split(sep
=" ")
1448 if len(cells
) > 0 and cells
[0].startswith("service/"):
1449 elems
= cells
[0].split(sep
="/")
1451 services
.append(elems
[1])
1455 def _get_deep(dictionary
: dict, members
: tuple):
1460 value
= target
.get(m
)
1469 # find key:value in several lines
1471 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1472 for line
in p_lines
:
1474 if line
.startswith(p_key
+ ":"):
1475 parts
= line
.split(":")
1476 the_value
= parts
[1].strip()
1484 def _lower_keys_list(input_list
: list):
1486 Transform the keys in a list of dictionaries to lower case and returns a new list
1491 for dictionary
in input_list
:
1492 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1493 new_list
.append(new_dict
)
1496 async def _local_async_exec(
1499 raise_exception_on_error
: bool = False,
1500 show_error_log
: bool = True,
1501 encode_utf8
: bool = False,
1505 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1507 "Executing async local command: {}, env: {}".format(command
, env
)
1511 command
= shlex
.split(command
)
1513 environ
= os
.environ
.copy()
1518 process
= await asyncio
.create_subprocess_exec(
1520 stdout
=asyncio
.subprocess
.PIPE
,
1521 stderr
=asyncio
.subprocess
.PIPE
,
1525 # wait for command terminate
1526 stdout
, stderr
= await process
.communicate()
1528 return_code
= process
.returncode
1532 output
= stdout
.decode("utf-8").strip()
1533 # output = stdout.decode()
1535 output
= stderr
.decode("utf-8").strip()
1536 # output = stderr.decode()
1538 if return_code
!= 0 and show_error_log
:
1540 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1543 self
.log
.debug("Return code: {}".format(return_code
))
1545 if raise_exception_on_error
and return_code
!= 0:
1546 raise K8sException(output
)
1549 output
= output
.encode("utf-8").strip()
1550 output
= str(output
).replace("\\n", "\n")
1552 return output
, return_code
1554 except asyncio
.CancelledError
:
1556 except K8sException
:
1558 except Exception as e
:
1559 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1561 if raise_exception_on_error
:
1562 raise K8sException(e
) from e
1566 async def _local_async_exec_pipe(
1570 raise_exception_on_error
: bool = True,
1571 show_error_log
: bool = True,
1572 encode_utf8
: bool = False,
1576 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1577 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1578 command
= "{} | {}".format(command1
, command2
)
1580 "Executing async local command: {}, env: {}".format(command
, env
)
1584 command1
= shlex
.split(command1
)
1585 command2
= shlex
.split(command2
)
1587 environ
= os
.environ
.copy()
1592 read
, write
= os
.pipe()
1593 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1595 process_2
= await asyncio
.create_subprocess_exec(
1596 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1599 stdout
, stderr
= await process_2
.communicate()
1601 return_code
= process_2
.returncode
1605 output
= stdout
.decode("utf-8").strip()
1606 # output = stdout.decode()
1608 output
= stderr
.decode("utf-8").strip()
1609 # output = stderr.decode()
1611 if return_code
!= 0 and show_error_log
:
1613 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1616 self
.log
.debug("Return code: {}".format(return_code
))
1618 if raise_exception_on_error
and return_code
!= 0:
1619 raise K8sException(output
)
1622 output
= output
.encode("utf-8").strip()
1623 output
= str(output
).replace("\\n", "\n")
1625 return output
, return_code
1626 except asyncio
.CancelledError
:
1628 except K8sException
:
1630 except Exception as e
:
1631 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1633 if raise_exception_on_error
:
1634 raise K8sException(e
) from e
1638 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1640 Obtains the data of the specified service in the k8cluster.
1642 :param cluster_id: id of a K8s cluster known by OSM
1643 :param service_name: name of the K8s service in the specified namespace
1644 :param namespace: K8s namespace used by the KDU instance
1645 :return: If successful, it will return a service with the following data:
1646 - `name` of the service
1647 - `type` type of service in the k8 cluster
1648 - `ports` List of ports offered by the service, for each port includes at least
1649 name, port, protocol
1650 - `cluster_ip` Internal ip to be used inside k8s cluster
1651 - `external_ip` List of external ips (in case they are available)
1655 paths
, env
= self
._init
_paths
_env
(
1656 cluster_name
=cluster_id
, create_if_not_exist
=True
1659 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1660 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1663 output
, _rc
= await self
._local
_async
_exec
(
1664 command
=command
, raise_exception_on_error
=True, env
=env
1667 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1670 "name": service_name
,
1671 "type": self
._get
_deep
(data
, ("spec", "type")),
1672 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1673 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1675 if service
["type"] == "LoadBalancer":
1676 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1677 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1678 service
["external_ip"] = ip_list
1682 async def _exec_get_command(
1683 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1685 """Obtains information about the kdu instance."""
1687 full_command
= self
._get
_get
_command
(
1688 get_command
, kdu_instance
, namespace
, kubeconfig
1691 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1695 async def _exec_inspect_command(
1696 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1698 """Obtains information about an Helm Chart package (´helm show´ command)
1701 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1702 kdu_model: The name or path of an Helm Chart
1703 repo_url: Helm Chart repository url
1706 str: the requested info about the Helm Chart package
1711 repo_str
= " --repo {}".format(repo_url
)
1713 idx
= kdu_model
.find("/")
1716 kdu_model
= kdu_model
[idx
:]
1718 kdu_model
, version
= self
._split
_version
(kdu_model
)
1720 version_str
= "--version {}".format(version
)
1724 full_command
= self
._get
_inspect
_command
(
1725 inspect_command
, kdu_model
, repo_str
, version_str
1728 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1732 async def _get_replica_count_url(
1735 repo_url
: str = None,
1736 resource_name
: str = None,
1738 """Get the replica count value in the Helm Chart Values.
1741 kdu_model: The name or path of an Helm Chart
1742 repo_url: Helm Chart repository url
1743 resource_name: Resource name
1746 True if replicas, False replicaCount
1749 kdu_values
= yaml
.load(
1750 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1751 Loader
=yaml
.SafeLoader
,
1756 "kdu_values not found for kdu_model {}".format(kdu_model
)
1760 kdu_values
= kdu_values
.get(resource_name
, None)
1763 msg
= "resource {} not found in the values in model {}".format(
1764 resource_name
, kdu_model
1767 raise K8sException(msg
)
1769 duplicate_check
= False
1774 if kdu_values
.get("replicaCount", None):
1775 replicas
= kdu_values
["replicaCount"]
1776 replica_str
= "replicaCount"
1777 elif kdu_values
.get("replicas", None):
1778 duplicate_check
= True
1779 replicas
= kdu_values
["replicas"]
1780 replica_str
= "replicas"
1784 "replicaCount or replicas not found in the resource"
1785 "{} values in model {}. Cannot be scaled".format(
1786 resource_name
, kdu_model
1791 "replicaCount or replicas not found in the values"
1792 "in model {}. Cannot be scaled".format(kdu_model
)
1795 raise K8sException(msg
)
1797 # Control if replicas and replicaCount exists at the same time
1798 msg
= "replicaCount and replicas are exists at the same time"
1800 if "replicaCount" in kdu_values
:
1802 raise K8sException(msg
)
1804 if "replicas" in kdu_values
:
1806 raise K8sException(msg
)
1808 return replicas
, replica_str
1810 async def _get_replica_count_instance(
1815 resource_name
: str = None,
1817 """Get the replica count value in the instance.
1820 kdu_instance: The name of the KDU instance
1821 namespace: KDU instance namespace
1823 resource_name: Resource name
1826 True if replicas, False replicaCount
1829 kdu_values
= yaml
.load(
1830 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1831 Loader
=yaml
.SafeLoader
,
1838 kdu_values
.get(resource_name
, None) if resource_name
else None
1842 resource_values
.get("replicaCount", None)
1843 or resource_values
.get("replicas", None)
1847 kdu_values
.get("replicaCount", None)
1848 or kdu_values
.get("replicas", None)
1854 async def _store_status(
1859 namespace
: str = None,
1860 check_every
: float = 10,
1861 db_dict
: dict = None,
1862 run_once
: bool = False,
1866 await asyncio
.sleep(check_every
)
1867 detailed_status
= await self
._status
_kdu
(
1868 cluster_id
=cluster_id
,
1869 kdu_instance
=kdu_instance
,
1871 namespace
=namespace
,
1873 status
= detailed_status
.get("info").get("description")
1874 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1875 # write status to db
1876 result
= await self
.write_app_status_to_db(
1879 detailed_status
=str(detailed_status
),
1880 operation
=operation
,
1883 self
.log
.info("Error writing in database. Task exiting...")
1885 except asyncio
.CancelledError
:
1886 self
.log
.debug("Task cancelled")
1888 except Exception as e
:
1890 "_store_status exception: {}".format(str(e
)), exc_info
=True
1897 # params for use in -f file
1898 # returns values file option and filename (in order to delete it at the end)
1899 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1901 if params
and len(params
) > 0:
1902 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1904 def get_random_number():
1905 r
= random
.randrange(start
=1, stop
=99999999)
1913 value
= params
.get(key
)
1914 if "!!yaml" in str(value
):
1915 value
= yaml
.safe_load(value
[7:])
1916 params2
[key
] = value
1918 values_file
= get_random_number() + ".yaml"
1919 with
open(values_file
, "w") as stream
:
1920 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1922 return "-f {}".format(values_file
), values_file
1926 # params for use in --set option
1928 def _params_to_set_option(params
: dict) -> str:
1930 if params
and len(params
) > 0:
1933 value
= params
.get(key
, None)
1934 if value
is not None:
1936 params_str
+= "--set "
1940 params_str
+= "{}={}".format(key
, value
)
1944 def generate_kdu_instance_name(**kwargs
):
1945 chart_name
= kwargs
["kdu_model"]
1946 # check embeded chart (file or dir)
1947 if chart_name
.startswith("/"):
1948 # extract file or directory name
1949 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1951 elif "://" in chart_name
:
1952 # extract last portion of URL
1953 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1956 for c
in chart_name
:
1957 if c
.isalpha() or c
.isnumeric():
1964 # if does not start with alpha character, prefix 'a'
1965 if not name
[0].isalpha():
1970 def get_random_number():
1971 r
= random
.randrange(start
=1, stop
=99999999)
1973 s
= s
.rjust(10, "0")
1976 name
= name
+ get_random_number()
1979 def _split_version(self
, kdu_model
: str) -> (str, str):
1981 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
1982 parts
= kdu_model
.split(sep
=":")
1984 version
= str(parts
[1])
1985 kdu_model
= parts
[0]
1986 return kdu_model
, version
1988 async def _split_repo(self
, kdu_model
: str) -> str:
1990 idx
= kdu_model
.find("/")
1992 repo_name
= kdu_model
[:idx
]
1995 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1996 """Obtain the Helm repository for an Helm Chart
1999 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2000 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2003 str: the repository URL; if Helm Chart is a local one, the function returns None
2007 idx
= kdu_model
.find("/")
2009 repo_name
= kdu_model
[:idx
]
2010 # Find repository link
2011 local_repo_list
= await self
.repo_list(cluster_uuid
)
2012 for repo
in local_repo_list
:
2013 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None