Bug 2006 fixed: removed the while true from K8sHelmBaseConnector._store_status
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
156 ):
157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_uuid, repo_type, name, url
160 )
161 )
162
163 # init_env
164 paths, env = self._init_paths_env(
165 cluster_name=cluster_uuid, create_if_not_exist=True
166 )
167
168 # sync local dir
169 self.fs.sync(from_path=cluster_uuid)
170
171 # helm repo add name url
172 command = "env KUBECONFIG={} {} repo add {} {}".format(
173 paths["kube_config"], self._helm_command, name, url
174 )
175 self.log.debug("adding repo: {}".format(command))
176 await self._local_async_exec(
177 command=command, raise_exception_on_error=True, env=env
178 )
179
180 # helm repo update
181 command = "env KUBECONFIG={} {} repo update {}".format(
182 paths["kube_config"], self._helm_command, name
183 )
184 self.log.debug("updating repo: {}".format(command))
185 await self._local_async_exec(
186 command=command, raise_exception_on_error=False, env=env
187 )
188
189 # sync fs
190 self.fs.reverse_sync(from_path=cluster_uuid)
191
192 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
193 self.log.debug(
194 "Cluster {}, updating {} repository {}".format(
195 cluster_uuid, repo_type, name
196 )
197 )
198
199 # init_env
200 paths, env = self._init_paths_env(
201 cluster_name=cluster_uuid, create_if_not_exist=True
202 )
203
204 # sync local dir
205 self.fs.sync(from_path=cluster_uuid)
206
207 # helm repo update
208 command = "{} repo update {}".format(self._helm_command, name)
209 self.log.debug("updating repo: {}".format(command))
210 await self._local_async_exec(
211 command=command, raise_exception_on_error=False, env=env
212 )
213
214 # sync fs
215 self.fs.reverse_sync(from_path=cluster_uuid)
216
217 async def repo_list(self, cluster_uuid: str) -> list:
218 """
219 Get the list of registered repositories
220
221 :return: list of registered repositories: [ (name, url) .... ]
222 """
223
224 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
225
226 # config filename
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 command = "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths["kube_config"], self._helm_command
236 )
237
238 # Set exception to false because if there are no repos just want an empty list
239 output, _rc = await self._local_async_exec(
240 command=command, raise_exception_on_error=False, env=env
241 )
242
243 # sync fs
244 self.fs.reverse_sync(from_path=cluster_uuid)
245
246 if _rc == 0:
247 if output and len(output) > 0:
248 repos = yaml.load(output, Loader=yaml.SafeLoader)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self._lower_keys_list(repos)
251 else:
252 return []
253 else:
254 return []
255
256 async def repo_remove(self, cluster_uuid: str, name: str):
257 self.log.debug(
258 "remove {} repositories for cluster {}".format(name, cluster_uuid)
259 )
260
261 # init env, paths
262 paths, env = self._init_paths_env(
263 cluster_name=cluster_uuid, create_if_not_exist=True
264 )
265
266 # sync local dir
267 self.fs.sync(from_path=cluster_uuid)
268
269 command = "env KUBECONFIG={} {} repo remove {}".format(
270 paths["kube_config"], self._helm_command, name
271 )
272 await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
274 )
275
276 # sync fs
277 self.fs.reverse_sync(from_path=cluster_uuid)
278
279 async def reset(
280 self,
281 cluster_uuid: str,
282 force: bool = False,
283 uninstall_sw: bool = False,
284 **kwargs,
285 ) -> bool:
286 """Reset a cluster
287
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
289
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
295 """
296 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
297 self.log.debug(
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_uuid, uninstall_sw
300 )
301 )
302
303 # sync local dir
304 self.fs.sync(from_path=cluster_uuid)
305
306 # uninstall releases if needed.
307 if uninstall_sw:
308 releases = await self.instances_list(cluster_uuid=cluster_uuid)
309 if len(releases) > 0:
310 if force:
311 for r in releases:
312 try:
313 kdu_instance = r.get("name")
314 chart = r.get("chart")
315 self.log.debug(
316 "Uninstalling {} -> {}".format(chart, kdu_instance)
317 )
318 await self.uninstall(
319 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
320 )
321 except Exception as e:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
324 # raised an error
325 self.log.warn(
326 "Error uninstalling release {}: {}".format(
327 kdu_instance, e
328 )
329 )
330 else:
331 msg = (
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 ).format(cluster_uuid)
334 self.log.warn(msg)
335 uninstall_sw = (
336 False # Allow to remove k8s cluster without removing Tiller
337 )
338
339 if uninstall_sw:
340 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
341
342 # delete cluster directory
343 self.log.debug("Removing directory {}".format(cluster_uuid))
344 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
345 # Remove also local directorio if still exist
346 direct = self.fs.path + "/" + cluster_uuid
347 shutil.rmtree(direct, ignore_errors=True)
348
349 return True
350
351 async def _install_impl(
352 self,
353 cluster_id: str,
354 kdu_model: str,
355 paths: dict,
356 env: dict,
357 kdu_instance: str,
358 atomic: bool = True,
359 timeout: float = 300,
360 params: dict = None,
361 db_dict: dict = None,
362 kdu_name: str = None,
363 namespace: str = None,
364 ):
365 # init env, paths
366 paths, env = self._init_paths_env(
367 cluster_name=cluster_id, create_if_not_exist=True
368 )
369
370 # params to str
371 params_str, file_to_delete = self._params_to_file_option(
372 cluster_id=cluster_id, params=params
373 )
374
375 # version
376 version = None
377 if ":" in kdu_model:
378 parts = kdu_model.split(sep=":")
379 if len(parts) == 2:
380 version = str(parts[1])
381 kdu_model = parts[0]
382
383 repo = self._split_repo(kdu_model)
384 if repo:
385 self.repo_update(cluster_id, repo)
386
387 command = self._get_install_command(
388 kdu_model,
389 kdu_instance,
390 namespace,
391 params_str,
392 version,
393 atomic,
394 timeout,
395 paths["kube_config"],
396 )
397
398 self.log.debug("installing: {}".format(command))
399
400 if atomic:
401 # exec helm in a task
402 exec_task = asyncio.ensure_future(
403 coro_or_future=self._local_async_exec(
404 command=command, raise_exception_on_error=False, env=env
405 )
406 )
407
408 # write status in another task
409 status_task = asyncio.ensure_future(
410 coro_or_future=self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
416 )
417 )
418
419 # wait for execution task
420 await asyncio.wait([exec_task])
421
422 # cancel status task
423 status_task.cancel()
424
425 output, rc = exec_task.result()
426
427 else:
428
429 output, rc = await self._local_async_exec(
430 command=command, raise_exception_on_error=False, env=env
431 )
432
433 # remove temporal values yaml file
434 if file_to_delete:
435 os.remove(file_to_delete)
436
437 # write final status
438 await self._store_status(
439 cluster_id=cluster_id,
440 kdu_instance=kdu_instance,
441 namespace=namespace,
442 db_dict=db_dict,
443 operation="install",
444 )
445
446 if rc != 0:
447 msg = "Error executing command: {}\nOutput: {}".format(command, output)
448 self.log.error(msg)
449 raise K8sException(msg)
450
451 async def upgrade(
452 self,
453 cluster_uuid: str,
454 kdu_instance: str,
455 kdu_model: str = None,
456 atomic: bool = True,
457 timeout: float = 300,
458 params: dict = None,
459 db_dict: dict = None,
460 ):
461 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
462
463 # sync local dir
464 self.fs.sync(from_path=cluster_uuid)
465
466 # look for instance to obtain namespace
467 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
468 if not instance_info:
469 raise K8sException("kdu_instance {} not found".format(kdu_instance))
470
471 # init env, paths
472 paths, env = self._init_paths_env(
473 cluster_name=cluster_uuid, create_if_not_exist=True
474 )
475
476 # sync local dir
477 self.fs.sync(from_path=cluster_uuid)
478
479 # params to str
480 params_str, file_to_delete = self._params_to_file_option(
481 cluster_id=cluster_uuid, params=params
482 )
483
484 # version
485 version = None
486 if ":" in kdu_model:
487 parts = kdu_model.split(sep=":")
488 if len(parts) == 2:
489 version = str(parts[1])
490 kdu_model = parts[0]
491
492 repo = self._split_repo(kdu_model)
493 if repo:
494 self.repo_update(cluster_uuid, repo)
495
496 command = self._get_upgrade_command(
497 kdu_model,
498 kdu_instance,
499 instance_info["namespace"],
500 params_str,
501 version,
502 atomic,
503 timeout,
504 paths["kube_config"],
505 )
506
507 self.log.debug("upgrading: {}".format(command))
508
509 if atomic:
510
511 # exec helm in a task
512 exec_task = asyncio.ensure_future(
513 coro_or_future=self._local_async_exec(
514 command=command, raise_exception_on_error=False, env=env
515 )
516 )
517 # write status in another task
518 status_task = asyncio.ensure_future(
519 coro_or_future=self._store_status(
520 cluster_id=cluster_uuid,
521 kdu_instance=kdu_instance,
522 namespace=instance_info["namespace"],
523 db_dict=db_dict,
524 operation="upgrade",
525 )
526 )
527
528 # wait for execution task
529 await asyncio.wait([exec_task])
530
531 # cancel status task
532 status_task.cancel()
533 output, rc = exec_task.result()
534
535 else:
536
537 output, rc = await self._local_async_exec(
538 command=command, raise_exception_on_error=False, env=env
539 )
540
541 # remove temporal values yaml file
542 if file_to_delete:
543 os.remove(file_to_delete)
544
545 # write final status
546 await self._store_status(
547 cluster_id=cluster_uuid,
548 kdu_instance=kdu_instance,
549 namespace=instance_info["namespace"],
550 db_dict=db_dict,
551 operation="upgrade",
552 )
553
554 if rc != 0:
555 msg = "Error executing command: {}\nOutput: {}".format(command, output)
556 self.log.error(msg)
557 raise K8sException(msg)
558
559 # sync fs
560 self.fs.reverse_sync(from_path=cluster_uuid)
561
562 # return new revision number
563 instance = await self.get_instance_info(
564 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
565 )
566 if instance:
567 revision = int(instance.get("revision"))
568 self.log.debug("New revision: {}".format(revision))
569 return revision
570 else:
571 return 0
572
573 async def scale(
574 self,
575 kdu_instance: str,
576 scale: int,
577 resource_name: str,
578 total_timeout: float = 1800,
579 **kwargs,
580 ):
581 raise NotImplementedError("Method not implemented")
582
583 async def get_scale_count(
584 self,
585 resource_name: str,
586 kdu_instance: str,
587 **kwargs,
588 ):
589 raise NotImplementedError("Method not implemented")
590
591 async def rollback(
592 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
593 ):
594 self.log.debug(
595 "rollback kdu_instance {} to revision {} from cluster {}".format(
596 kdu_instance, revision, cluster_uuid
597 )
598 )
599
600 # sync local dir
601 self.fs.sync(from_path=cluster_uuid)
602
603 # look for instance to obtain namespace
604 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
605 if not instance_info:
606 raise K8sException("kdu_instance {} not found".format(kdu_instance))
607
608 # init env, paths
609 paths, env = self._init_paths_env(
610 cluster_name=cluster_uuid, create_if_not_exist=True
611 )
612
613 # sync local dir
614 self.fs.sync(from_path=cluster_uuid)
615
616 command = self._get_rollback_command(
617 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
618 )
619
620 self.log.debug("rolling_back: {}".format(command))
621
622 # exec helm in a task
623 exec_task = asyncio.ensure_future(
624 coro_or_future=self._local_async_exec(
625 command=command, raise_exception_on_error=False, env=env
626 )
627 )
628 # write status in another task
629 status_task = asyncio.ensure_future(
630 coro_or_future=self._store_status(
631 cluster_id=cluster_uuid,
632 kdu_instance=kdu_instance,
633 namespace=instance_info["namespace"],
634 db_dict=db_dict,
635 operation="rollback",
636 )
637 )
638
639 # wait for execution task
640 await asyncio.wait([exec_task])
641
642 # cancel status task
643 status_task.cancel()
644
645 output, rc = exec_task.result()
646
647 # write final status
648 await self._store_status(
649 cluster_id=cluster_uuid,
650 kdu_instance=kdu_instance,
651 namespace=instance_info["namespace"],
652 db_dict=db_dict,
653 operation="rollback",
654 )
655
656 if rc != 0:
657 msg = "Error executing command: {}\nOutput: {}".format(command, output)
658 self.log.error(msg)
659 raise K8sException(msg)
660
661 # sync fs
662 self.fs.reverse_sync(from_path=cluster_uuid)
663
664 # return new revision number
665 instance = await self.get_instance_info(
666 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
667 )
668 if instance:
669 revision = int(instance.get("revision"))
670 self.log.debug("New revision: {}".format(revision))
671 return revision
672 else:
673 return 0
674
675 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
676 """
677 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
678 (this call should happen after all _terminate-config-primitive_ of the VNF
679 are invoked).
680
681 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
682 :param kdu_instance: unique name for the KDU instance to be deleted
683 :param kwargs: Additional parameters (None yet)
684 :return: True if successful
685 """
686
687 self.log.debug(
688 "uninstall kdu_instance {} from cluster {}".format(
689 kdu_instance, cluster_uuid
690 )
691 )
692
693 # sync local dir
694 self.fs.sync(from_path=cluster_uuid)
695
696 # look for instance to obtain namespace
697 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
698 if not instance_info:
699 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
700 return True
701 # init env, paths
702 paths, env = self._init_paths_env(
703 cluster_name=cluster_uuid, create_if_not_exist=True
704 )
705
706 # sync local dir
707 self.fs.sync(from_path=cluster_uuid)
708
709 command = self._get_uninstall_command(
710 kdu_instance, instance_info["namespace"], paths["kube_config"]
711 )
712 output, _rc = await self._local_async_exec(
713 command=command, raise_exception_on_error=True, env=env
714 )
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_uuid)
718
719 return self._output_to_table(output)
720
721 async def instances_list(self, cluster_uuid: str) -> list:
722 """
723 returns a list of deployed releases in a cluster
724
725 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
726 :return:
727 """
728
729 self.log.debug("list releases for cluster {}".format(cluster_uuid))
730
731 # sync local dir
732 self.fs.sync(from_path=cluster_uuid)
733
734 # execute internal command
735 result = await self._instances_list(cluster_uuid)
736
737 # sync fs
738 self.fs.reverse_sync(from_path=cluster_uuid)
739
740 return result
741
742 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
743 instances = await self.instances_list(cluster_uuid=cluster_uuid)
744 for instance in instances:
745 if instance.get("name") == kdu_instance:
746 return instance
747 self.log.debug("Instance {} not found".format(kdu_instance))
748 return None
749
750 async def exec_primitive(
751 self,
752 cluster_uuid: str = None,
753 kdu_instance: str = None,
754 primitive_name: str = None,
755 timeout: float = 300,
756 params: dict = None,
757 db_dict: dict = None,
758 **kwargs,
759 ) -> str:
760 """Exec primitive (Juju action)
761
762 :param cluster_uuid: The UUID of the cluster or namespace:cluster
763 :param kdu_instance: The unique name of the KDU instance
764 :param primitive_name: Name of action that will be executed
765 :param timeout: Timeout for action execution
766 :param params: Dictionary of all the parameters needed for the action
767 :db_dict: Dictionary for any additional data
768 :param kwargs: Additional parameters (None yet)
769
770 :return: Returns the output of the action
771 """
772 raise K8sException(
773 "KDUs deployed with Helm don't support actions "
774 "different from rollback, upgrade and status"
775 )
776
777 async def get_services(
778 self, cluster_uuid: str, kdu_instance: str, namespace: str
779 ) -> list:
780 """
781 Returns a list of services defined for the specified kdu instance.
782
783 :param cluster_uuid: UUID of a K8s cluster known by OSM
784 :param kdu_instance: unique name for the KDU instance
785 :param namespace: K8s namespace used by the KDU instance
786 :return: If successful, it will return a list of services, Each service
787 can have the following data:
788 - `name` of the service
789 - `type` type of service in the k8 cluster
790 - `ports` List of ports offered by the service, for each port includes at least
791 name, port, protocol
792 - `cluster_ip` Internal ip to be used inside k8s cluster
793 - `external_ip` List of external ips (in case they are available)
794 """
795
796 self.log.debug(
797 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
798 cluster_uuid, kdu_instance
799 )
800 )
801
802 # init env, paths
803 paths, env = self._init_paths_env(
804 cluster_name=cluster_uuid, create_if_not_exist=True
805 )
806
807 # sync local dir
808 self.fs.sync(from_path=cluster_uuid)
809
810 # get list of services names for kdu
811 service_names = await self._get_services(
812 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
813 )
814
815 service_list = []
816 for service in service_names:
817 service = await self._get_service(cluster_uuid, service, namespace)
818 service_list.append(service)
819
820 # sync fs
821 self.fs.reverse_sync(from_path=cluster_uuid)
822
823 return service_list
824
825 async def get_service(
826 self, cluster_uuid: str, service_name: str, namespace: str
827 ) -> object:
828
829 self.log.debug(
830 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
831 service_name, namespace, cluster_uuid
832 )
833 )
834
835 # sync local dir
836 self.fs.sync(from_path=cluster_uuid)
837
838 service = await self._get_service(cluster_uuid, service_name, namespace)
839
840 # sync fs
841 self.fs.reverse_sync(from_path=cluster_uuid)
842
843 return service
844
845 async def status_kdu(
846 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
847 ) -> Union[str, dict]:
848 """
849 This call would retrieve tha current state of a given KDU instance. It would be
850 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
851 values_ of the configuration parameters applied to a given instance. This call
852 would be based on the `status` call.
853
854 :param cluster_uuid: UUID of a K8s cluster known by OSM
855 :param kdu_instance: unique name for the KDU instance
856 :param kwargs: Additional parameters (None yet)
857 :param yaml_format: if the return shall be returned as an YAML string or as a
858 dictionary
859 :return: If successful, it will return the following vector of arguments:
860 - K8s `namespace` in the cluster where the KDU lives
861 - `state` of the KDU instance. It can be:
862 - UNKNOWN
863 - DEPLOYED
864 - DELETED
865 - SUPERSEDED
866 - FAILED or
867 - DELETING
868 - List of `resources` (objects) that this release consists of, sorted by kind,
869 and the status of those resources
870 - Last `deployment_time`.
871
872 """
873 self.log.debug(
874 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
875 cluster_uuid, kdu_instance
876 )
877 )
878
879 # sync local dir
880 self.fs.sync(from_path=cluster_uuid)
881
882 # get instance: needed to obtain namespace
883 instances = await self._instances_list(cluster_id=cluster_uuid)
884 for instance in instances:
885 if instance.get("name") == kdu_instance:
886 break
887 else:
888 # instance does not exist
889 raise K8sException(
890 "Instance name: {} not found in cluster: {}".format(
891 kdu_instance, cluster_uuid
892 )
893 )
894
895 status = await self._status_kdu(
896 cluster_id=cluster_uuid,
897 kdu_instance=kdu_instance,
898 namespace=instance["namespace"],
899 yaml_format=yaml_format,
900 show_error_log=True,
901 )
902
903 # sync fs
904 self.fs.reverse_sync(from_path=cluster_uuid)
905
906 return status
907
908 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
909
910 self.log.debug(
911 "inspect kdu_model values {} from (optional) repo: {}".format(
912 kdu_model, repo_url
913 )
914 )
915
916 return await self._exec_inspect_comand(
917 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
918 )
919
920 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
921
922 self.log.debug(
923 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
924 )
925
926 return await self._exec_inspect_comand(
927 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
928 )
929
930 async def synchronize_repos(self, cluster_uuid: str):
931
932 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
933 try:
934 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
935 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
936
937 local_repo_list = await self.repo_list(cluster_uuid)
938 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
939
940 deleted_repo_list = []
941 added_repo_dict = {}
942
943 # iterate over the list of repos in the database that should be
944 # added if not present
945 for repo_name, db_repo in db_repo_dict.items():
946 try:
947 # check if it is already present
948 curr_repo_url = local_repo_dict.get(db_repo["name"])
949 repo_id = db_repo.get("_id")
950 if curr_repo_url != db_repo["url"]:
951 if curr_repo_url:
952 self.log.debug(
953 "repo {} url changed, delete and and again".format(
954 db_repo["url"]
955 )
956 )
957 await self.repo_remove(cluster_uuid, db_repo["name"])
958 deleted_repo_list.append(repo_id)
959
960 # add repo
961 self.log.debug("add repo {}".format(db_repo["name"]))
962 await self.repo_add(
963 cluster_uuid, db_repo["name"], db_repo["url"]
964 )
965 added_repo_dict[repo_id] = db_repo["name"]
966 except Exception as e:
967 raise K8sException(
968 "Error adding repo id: {}, err_msg: {} ".format(
969 repo_id, repr(e)
970 )
971 )
972
973 # Delete repos that are present but not in nbi_list
974 for repo_name in local_repo_dict:
975 if not db_repo_dict.get(repo_name) and repo_name != "stable":
976 self.log.debug("delete repo {}".format(repo_name))
977 try:
978 await self.repo_remove(cluster_uuid, repo_name)
979 deleted_repo_list.append(repo_name)
980 except Exception as e:
981 self.warning(
982 "Error deleting repo, name: {}, err_msg: {}".format(
983 repo_name, str(e)
984 )
985 )
986
987 return deleted_repo_list, added_repo_dict
988
989 except K8sException:
990 raise
991 except Exception as e:
992 # Do not raise errors synchronizing repos
993 self.log.error("Error synchronizing repos: {}".format(e))
994 raise Exception("Error synchronizing repos: {}".format(e))
995
996 def _get_db_repos_dict(self, repo_ids: list):
997 db_repos_dict = {}
998 for repo_id in repo_ids:
999 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1000 db_repos_dict[db_repo["name"]] = db_repo
1001 return db_repos_dict
1002
1003 """
1004 ####################################################################################
1005 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1006 ####################################################################################
1007 """
1008
1009 @abc.abstractmethod
1010 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1011 """
1012 Creates and returns base cluster and kube dirs and returns them.
1013 Also created helm3 dirs according to new directory specification, paths are
1014 not returned but assigned to helm environment variables
1015
1016 :param cluster_name: cluster_name
1017 :return: Dictionary with config_paths and dictionary with helm environment variables
1018 """
1019
1020 @abc.abstractmethod
1021 async def _cluster_init(self, cluster_id, namespace, paths, env):
1022 """
1023 Implements the helm version dependent cluster initialization
1024 """
1025
1026 @abc.abstractmethod
1027 async def _instances_list(self, cluster_id):
1028 """
1029 Implements the helm version dependent helm instances list
1030 """
1031
1032 @abc.abstractmethod
1033 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1034 """
1035 Implements the helm version dependent method to obtain services from a helm instance
1036 """
1037
1038 @abc.abstractmethod
1039 async def _status_kdu(
1040 self,
1041 cluster_id: str,
1042 kdu_instance: str,
1043 namespace: str = None,
1044 yaml_format: bool = False,
1045 show_error_log: bool = False,
1046 ) -> Union[str, dict]:
1047 """
1048 Implements the helm version dependent method to obtain status of a helm instance
1049 """
1050
1051 @abc.abstractmethod
1052 def _get_install_command(
1053 self,
1054 kdu_model,
1055 kdu_instance,
1056 namespace,
1057 params_str,
1058 version,
1059 atomic,
1060 timeout,
1061 kubeconfig,
1062 ) -> str:
1063 """
1064 Obtain command to be executed to delete the indicated instance
1065 """
1066
1067 @abc.abstractmethod
1068 def _get_upgrade_command(
1069 self,
1070 kdu_model,
1071 kdu_instance,
1072 namespace,
1073 params_str,
1074 version,
1075 atomic,
1076 timeout,
1077 kubeconfig,
1078 ) -> str:
1079 """
1080 Obtain command to be executed to upgrade the indicated instance
1081 """
1082
1083 @abc.abstractmethod
1084 def _get_rollback_command(
1085 self, kdu_instance, namespace, revision, kubeconfig
1086 ) -> str:
1087 """
1088 Obtain command to be executed to rollback the indicated instance
1089 """
1090
1091 @abc.abstractmethod
1092 def _get_uninstall_command(
1093 self, kdu_instance: str, namespace: str, kubeconfig: str
1094 ) -> str:
1095 """
1096 Obtain command to be executed to delete the indicated instance
1097 """
1098
1099 @abc.abstractmethod
1100 def _get_inspect_command(
1101 self, show_command: str, kdu_model: str, repo_str: str, version: str
1102 ):
1103 """
1104 Obtain command to be executed to obtain information about the kdu
1105 """
1106
1107 @abc.abstractmethod
1108 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1109 """
1110 Method call to uninstall cluster software for helm. This method is dependent
1111 of helm version
1112 For Helm v2 it will be called when Tiller must be uninstalled
1113 For Helm v3 it does nothing and does not need to be callled
1114 """
1115
1116 @abc.abstractmethod
1117 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1118 """
1119 Obtains the cluster repos identifiers
1120 """
1121
1122 """
1123 ####################################################################################
1124 ################################### P R I V A T E ##################################
1125 ####################################################################################
1126 """
1127
1128 @staticmethod
1129 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1130 if os.path.exists(filename):
1131 return True
1132 else:
1133 msg = "File {} does not exist".format(filename)
1134 if exception_if_not_exists:
1135 raise K8sException(msg)
1136
1137 @staticmethod
1138 def _remove_multiple_spaces(strobj):
1139 strobj = strobj.strip()
1140 while " " in strobj:
1141 strobj = strobj.replace(" ", " ")
1142 return strobj
1143
1144 @staticmethod
1145 def _output_to_lines(output: str) -> list:
1146 output_lines = list()
1147 lines = output.splitlines(keepends=False)
1148 for line in lines:
1149 line = line.strip()
1150 if len(line) > 0:
1151 output_lines.append(line)
1152 return output_lines
1153
1154 @staticmethod
1155 def _output_to_table(output: str) -> list:
1156 output_table = list()
1157 lines = output.splitlines(keepends=False)
1158 for line in lines:
1159 line = line.replace("\t", " ")
1160 line_list = list()
1161 output_table.append(line_list)
1162 cells = line.split(sep=" ")
1163 for cell in cells:
1164 cell = cell.strip()
1165 if len(cell) > 0:
1166 line_list.append(cell)
1167 return output_table
1168
1169 @staticmethod
1170 def _parse_services(output: str) -> list:
1171 lines = output.splitlines(keepends=False)
1172 services = []
1173 for line in lines:
1174 line = line.replace("\t", " ")
1175 cells = line.split(sep=" ")
1176 if len(cells) > 0 and cells[0].startswith("service/"):
1177 elems = cells[0].split(sep="/")
1178 if len(elems) > 1:
1179 services.append(elems[1])
1180 return services
1181
1182 @staticmethod
1183 def _get_deep(dictionary: dict, members: tuple):
1184 target = dictionary
1185 value = None
1186 try:
1187 for m in members:
1188 value = target.get(m)
1189 if not value:
1190 return None
1191 else:
1192 target = value
1193 except Exception:
1194 pass
1195 return value
1196
1197 # find key:value in several lines
1198 @staticmethod
1199 def _find_in_lines(p_lines: list, p_key: str) -> str:
1200 for line in p_lines:
1201 try:
1202 if line.startswith(p_key + ":"):
1203 parts = line.split(":")
1204 the_value = parts[1].strip()
1205 return the_value
1206 except Exception:
1207 # ignore it
1208 pass
1209 return None
1210
1211 @staticmethod
1212 def _lower_keys_list(input_list: list):
1213 """
1214 Transform the keys in a list of dictionaries to lower case and returns a new list
1215 of dictionaries
1216 """
1217 new_list = []
1218 if input_list:
1219 for dictionary in input_list:
1220 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1221 new_list.append(new_dict)
1222 return new_list
1223
1224 async def _local_async_exec(
1225 self,
1226 command: str,
1227 raise_exception_on_error: bool = False,
1228 show_error_log: bool = True,
1229 encode_utf8: bool = False,
1230 env: dict = None,
1231 ) -> (str, int):
1232
1233 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1234 self.log.debug(
1235 "Executing async local command: {}, env: {}".format(command, env)
1236 )
1237
1238 # split command
1239 command = shlex.split(command)
1240
1241 environ = os.environ.copy()
1242 if env:
1243 environ.update(env)
1244
1245 try:
1246 process = await asyncio.create_subprocess_exec(
1247 *command,
1248 stdout=asyncio.subprocess.PIPE,
1249 stderr=asyncio.subprocess.PIPE,
1250 env=environ,
1251 )
1252
1253 # wait for command terminate
1254 stdout, stderr = await process.communicate()
1255
1256 return_code = process.returncode
1257
1258 output = ""
1259 if stdout:
1260 output = stdout.decode("utf-8").strip()
1261 # output = stdout.decode()
1262 if stderr:
1263 output = stderr.decode("utf-8").strip()
1264 # output = stderr.decode()
1265
1266 if return_code != 0 and show_error_log:
1267 self.log.debug(
1268 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1269 )
1270 else:
1271 self.log.debug("Return code: {}".format(return_code))
1272
1273 if raise_exception_on_error and return_code != 0:
1274 raise K8sException(output)
1275
1276 if encode_utf8:
1277 output = output.encode("utf-8").strip()
1278 output = str(output).replace("\\n", "\n")
1279
1280 return output, return_code
1281
1282 except asyncio.CancelledError:
1283 raise
1284 except K8sException:
1285 raise
1286 except Exception as e:
1287 msg = "Exception executing command: {} -> {}".format(command, e)
1288 self.log.error(msg)
1289 if raise_exception_on_error:
1290 raise K8sException(e) from e
1291 else:
1292 return "", -1
1293
1294 async def _local_async_exec_pipe(
1295 self,
1296 command1: str,
1297 command2: str,
1298 raise_exception_on_error: bool = True,
1299 show_error_log: bool = True,
1300 encode_utf8: bool = False,
1301 env: dict = None,
1302 ):
1303
1304 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1305 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1306 command = "{} | {}".format(command1, command2)
1307 self.log.debug(
1308 "Executing async local command: {}, env: {}".format(command, env)
1309 )
1310
1311 # split command
1312 command1 = shlex.split(command1)
1313 command2 = shlex.split(command2)
1314
1315 environ = os.environ.copy()
1316 if env:
1317 environ.update(env)
1318
1319 try:
1320 read, write = os.pipe()
1321 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1322 os.close(write)
1323 process_2 = await asyncio.create_subprocess_exec(
1324 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1325 )
1326 os.close(read)
1327 stdout, stderr = await process_2.communicate()
1328
1329 return_code = process_2.returncode
1330
1331 output = ""
1332 if stdout:
1333 output = stdout.decode("utf-8").strip()
1334 # output = stdout.decode()
1335 if stderr:
1336 output = stderr.decode("utf-8").strip()
1337 # output = stderr.decode()
1338
1339 if return_code != 0 and show_error_log:
1340 self.log.debug(
1341 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1342 )
1343 else:
1344 self.log.debug("Return code: {}".format(return_code))
1345
1346 if raise_exception_on_error and return_code != 0:
1347 raise K8sException(output)
1348
1349 if encode_utf8:
1350 output = output.encode("utf-8").strip()
1351 output = str(output).replace("\\n", "\n")
1352
1353 return output, return_code
1354 except asyncio.CancelledError:
1355 raise
1356 except K8sException:
1357 raise
1358 except Exception as e:
1359 msg = "Exception executing command: {} -> {}".format(command, e)
1360 self.log.error(msg)
1361 if raise_exception_on_error:
1362 raise K8sException(e) from e
1363 else:
1364 return "", -1
1365
1366 async def _get_service(self, cluster_id, service_name, namespace):
1367 """
1368 Obtains the data of the specified service in the k8cluster.
1369
1370 :param cluster_id: id of a K8s cluster known by OSM
1371 :param service_name: name of the K8s service in the specified namespace
1372 :param namespace: K8s namespace used by the KDU instance
1373 :return: If successful, it will return a service with the following data:
1374 - `name` of the service
1375 - `type` type of service in the k8 cluster
1376 - `ports` List of ports offered by the service, for each port includes at least
1377 name, port, protocol
1378 - `cluster_ip` Internal ip to be used inside k8s cluster
1379 - `external_ip` List of external ips (in case they are available)
1380 """
1381
1382 # init config, env
1383 paths, env = self._init_paths_env(
1384 cluster_name=cluster_id, create_if_not_exist=True
1385 )
1386
1387 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1388 self.kubectl_command, paths["kube_config"], namespace, service_name
1389 )
1390
1391 output, _rc = await self._local_async_exec(
1392 command=command, raise_exception_on_error=True, env=env
1393 )
1394
1395 data = yaml.load(output, Loader=yaml.SafeLoader)
1396
1397 service = {
1398 "name": service_name,
1399 "type": self._get_deep(data, ("spec", "type")),
1400 "ports": self._get_deep(data, ("spec", "ports")),
1401 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1402 }
1403 if service["type"] == "LoadBalancer":
1404 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1405 ip_list = [elem["ip"] for elem in ip_map_list]
1406 service["external_ip"] = ip_list
1407
1408 return service
1409
1410 async def _exec_inspect_comand(
1411 self, inspect_command: str, kdu_model: str, repo_url: str = None
1412 ):
1413 """
1414 Obtains information about a kdu, no cluster (no env)
1415 """
1416
1417 repo_str = ""
1418 if repo_url:
1419 repo_str = " --repo {}".format(repo_url)
1420
1421 idx = kdu_model.find("/")
1422 if idx >= 0:
1423 idx += 1
1424 kdu_model = kdu_model[idx:]
1425
1426 version = ""
1427 if ":" in kdu_model:
1428 parts = kdu_model.split(sep=":")
1429 if len(parts) == 2:
1430 version = "--version {}".format(str(parts[1]))
1431 kdu_model = parts[0]
1432
1433 full_command = self._get_inspect_command(
1434 inspect_command, kdu_model, repo_str, version
1435 )
1436 output, _rc = await self._local_async_exec(
1437 command=full_command, encode_utf8=True
1438 )
1439
1440 return output
1441
1442 async def _store_status(
1443 self,
1444 cluster_id: str,
1445 operation: str,
1446 kdu_instance: str,
1447 namespace: str = None,
1448 db_dict: dict = None,
1449 ) -> None:
1450 """
1451 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1452
1453 :param cluster_id (str): the cluster where the KDU instance is deployed
1454 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1455 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1456 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1457 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1458 values for the keys:
1459 - "collection": The Mongo DB collection to write to
1460 - "filter": The query filter to use in the update process
1461 - "path": The dot separated keys which targets the object to be updated
1462 Defaults to None.
1463 """
1464
1465 try:
1466 detailed_status = await self._status_kdu(
1467 cluster_id=cluster_id,
1468 kdu_instance=kdu_instance,
1469 yaml_format=False,
1470 namespace=namespace,
1471 )
1472
1473 status = detailed_status.get("info").get("description")
1474 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1475
1476 # write status to db
1477 result = await self.write_app_status_to_db(
1478 db_dict=db_dict,
1479 status=str(status),
1480 detailed_status=str(detailed_status),
1481 operation=operation,
1482 )
1483
1484 if not result:
1485 self.log.info("Error writing in database. Task exiting...")
1486
1487 except asyncio.CancelledError as e:
1488 self.log.warning(
1489 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1490 )
1491 except Exception as e:
1492 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1493
1494 # params for use in -f file
1495 # returns values file option and filename (in order to delete it at the end)
1496 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1497
1498 if params and len(params) > 0:
1499 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1500
1501 def get_random_number():
1502 r = random.randrange(start=1, stop=99999999)
1503 s = str(r)
1504 while len(s) < 10:
1505 s = "0" + s
1506 return s
1507
1508 params2 = dict()
1509 for key in params:
1510 value = params.get(key)
1511 if "!!yaml" in str(value):
1512 value = yaml.load(value[7:])
1513 params2[key] = value
1514
1515 values_file = get_random_number() + ".yaml"
1516 with open(values_file, "w") as stream:
1517 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1518
1519 return "-f {}".format(values_file), values_file
1520
1521 return "", None
1522
1523 # params for use in --set option
1524 @staticmethod
1525 def _params_to_set_option(params: dict) -> str:
1526 params_str = ""
1527 if params and len(params) > 0:
1528 start = True
1529 for key in params:
1530 value = params.get(key, None)
1531 if value is not None:
1532 if start:
1533 params_str += "--set "
1534 start = False
1535 else:
1536 params_str += ","
1537 params_str += "{}={}".format(key, value)
1538 return params_str
1539
1540 @staticmethod
1541 def generate_kdu_instance_name(**kwargs):
1542 chart_name = kwargs["kdu_model"]
1543 # check embeded chart (file or dir)
1544 if chart_name.startswith("/"):
1545 # extract file or directory name
1546 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1547 # check URL
1548 elif "://" in chart_name:
1549 # extract last portion of URL
1550 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1551
1552 name = ""
1553 for c in chart_name:
1554 if c.isalpha() or c.isnumeric():
1555 name += c
1556 else:
1557 name += "-"
1558 if len(name) > 35:
1559 name = name[0:35]
1560
1561 # if does not start with alpha character, prefix 'a'
1562 if not name[0].isalpha():
1563 name = "a" + name
1564
1565 name += "-"
1566
1567 def get_random_number():
1568 r = random.randrange(start=1, stop=99999999)
1569 s = str(r)
1570 s = s.rjust(10, "0")
1571 return s
1572
1573 name = name + get_random_number()
1574 return name.lower()
1575
1576 def _split_version(self, kdu_model: str) -> (str, str):
1577 version = None
1578 if ":" in kdu_model:
1579 parts = kdu_model.split(sep=":")
1580 if len(parts) == 2:
1581 version = str(parts[1])
1582 kdu_model = parts[0]
1583 return kdu_model, version
1584
1585 async def _split_repo(self, kdu_model: str) -> str:
1586 repo_name = None
1587 idx = kdu_model.find("/")
1588 if idx >= 0:
1589 repo_name = kdu_model[:idx]
1590 return repo_name
1591
1592 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1593 repo_url = None
1594 idx = kdu_model.find("/")
1595 if idx >= 0:
1596 repo_name = kdu_model[:idx]
1597 # Find repository link
1598 local_repo_list = await self.repo_list(cluster_uuid)
1599 for repo in local_repo_list:
1600 repo_url = repo["url"] if repo["name"] == repo_name else None
1601 return repo_url