703bd737cc5ced06b2021ae00267ca45d1baca79
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 def _get_namespace(self, cluster_uuid: str) -> str:
93 """
94 Obtains the namespace used by the cluster with the uuid passed by argument
95
96 param: cluster_uuid: cluster's uuid
97 """
98
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster = self.db.get_one(
101 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
102 )
103 return k8scluster.get("namespace")
104
105 async def init_env(
106 self,
107 k8s_creds: str,
108 namespace: str = "kube-system",
109 reuse_cluster_uuid=None,
110 **kwargs,
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts
114
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
116 '.kube/config'
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125
126 if reuse_cluster_uuid:
127 cluster_id = reuse_cluster_uuid
128 else:
129 cluster_id = str(uuid4())
130
131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
151 return cluster_id, n2vc_installed_sw
152
153 async def repo_add(
154 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
155 ):
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_uuid, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_uuid, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_uuid)
169
170 # helm repo update
171 command = "env KUBECONFIG={} {} repo update".format(
172 paths["kube_config"], self._helm_command
173 )
174 self.log.debug("updating repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=False, env=env
177 )
178
179 # helm repo add name url
180 command = "env KUBECONFIG={} {} repo add {} {}".format(
181 paths["kube_config"], self._helm_command, name, url
182 )
183 self.log.debug("adding repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=True, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_uuid)
190
191 async def repo_list(self, cluster_uuid: str) -> list:
192 """
193 Get the list of registered repositories
194
195 :return: list of registered repositories: [ (name, url) .... ]
196 """
197
198 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
199
200 # config filename
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_uuid, create_if_not_exist=True
203 )
204
205 # sync local dir
206 self.fs.sync(from_path=cluster_uuid)
207
208 command = "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths["kube_config"], self._helm_command
210 )
211
212 # Set exception to false because if there are no repos just want an empty list
213 output, _rc = await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
218 self.fs.reverse_sync(from_path=cluster_uuid)
219
220 if _rc == 0:
221 if output and len(output) > 0:
222 repos = yaml.load(output, Loader=yaml.SafeLoader)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self._lower_keys_list(repos)
225 else:
226 return []
227 else:
228 return []
229
230 async def repo_remove(self, cluster_uuid: str, name: str):
231 self.log.debug(
232 "remove {} repositories for cluster {}".format(name, cluster_uuid)
233 )
234
235 # init env, paths
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_uuid, create_if_not_exist=True
238 )
239
240 # sync local dir
241 self.fs.sync(from_path=cluster_uuid)
242
243 command = "env KUBECONFIG={} {} repo remove {}".format(
244 paths["kube_config"], self._helm_command, name
245 )
246 await self._local_async_exec(
247 command=command, raise_exception_on_error=True, env=env
248 )
249
250 # sync fs
251 self.fs.reverse_sync(from_path=cluster_uuid)
252
253 async def reset(
254 self,
255 cluster_uuid: str,
256 force: bool = False,
257 uninstall_sw: bool = False,
258 **kwargs,
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
263
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
269 """
270 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
271 self.log.debug(
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_uuid, uninstall_sw
274 )
275 )
276
277 # sync local dir
278 self.fs.sync(from_path=cluster_uuid)
279
280 # uninstall releases if needed.
281 if uninstall_sw:
282 releases = await self.instances_list(cluster_uuid=cluster_uuid)
283 if len(releases) > 0:
284 if force:
285 for r in releases:
286 try:
287 kdu_instance = r.get("name")
288 chart = r.get("chart")
289 self.log.debug(
290 "Uninstalling {} -> {}".format(chart, kdu_instance)
291 )
292 await self.uninstall(
293 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
294 )
295 except Exception as e:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
298 # raised an error
299 self.log.warn(
300 "Error uninstalling release {}: {}".format(
301 kdu_instance, e
302 )
303 )
304 else:
305 msg = (
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_uuid)
308 self.log.warn(msg)
309 uninstall_sw = (
310 False # Allow to remove k8s cluster without removing Tiller
311 )
312
313 if uninstall_sw:
314 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
315
316 # delete cluster directory
317 self.log.debug("Removing directory {}".format(cluster_uuid))
318 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
319 # Remove also local directorio if still exist
320 direct = self.fs.path + "/" + cluster_uuid
321 shutil.rmtree(direct, ignore_errors=True)
322
323 return True
324
325 async def _install_impl(
326 self,
327 cluster_id: str,
328 kdu_model: str,
329 paths: dict,
330 env: dict,
331 kdu_instance: str,
332 atomic: bool = True,
333 timeout: float = 300,
334 params: dict = None,
335 db_dict: dict = None,
336 kdu_name: str = None,
337 namespace: str = None,
338 ):
339 # init env, paths
340 paths, env = self._init_paths_env(
341 cluster_name=cluster_id, create_if_not_exist=True
342 )
343
344 # params to str
345 params_str, file_to_delete = self._params_to_file_option(
346 cluster_id=cluster_id, params=params
347 )
348
349 # version
350 kdu_model, version = self._split_version(kdu_model)
351
352 command = self._get_install_command(
353 kdu_model,
354 kdu_instance,
355 namespace,
356 params_str,
357 version,
358 atomic,
359 timeout,
360 paths["kube_config"],
361 )
362
363 self.log.debug("installing: {}".format(command))
364
365 if atomic:
366 # exec helm in a task
367 exec_task = asyncio.ensure_future(
368 coro_or_future=self._local_async_exec(
369 command=command, raise_exception_on_error=False, env=env
370 )
371 )
372
373 # write status in another task
374 status_task = asyncio.ensure_future(
375 coro_or_future=self._store_status(
376 cluster_id=cluster_id,
377 kdu_instance=kdu_instance,
378 namespace=namespace,
379 db_dict=db_dict,
380 operation="install",
381 run_once=False,
382 )
383 )
384
385 # wait for execution task
386 await asyncio.wait([exec_task])
387
388 # cancel status task
389 status_task.cancel()
390
391 output, rc = exec_task.result()
392
393 else:
394
395 output, rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=False, env=env
397 )
398
399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
403 # write final status
404 await self._store_status(
405 cluster_id=cluster_id,
406 kdu_instance=kdu_instance,
407 namespace=namespace,
408 db_dict=db_dict,
409 operation="install",
410 run_once=True,
411 check_every=0,
412 )
413
414 if rc != 0:
415 msg = "Error executing command: {}\nOutput: {}".format(command, output)
416 self.log.error(msg)
417 raise K8sException(msg)
418
419 async def upgrade(
420 self,
421 cluster_uuid: str,
422 kdu_instance: str,
423 kdu_model: str = None,
424 atomic: bool = True,
425 timeout: float = 300,
426 params: dict = None,
427 db_dict: dict = None,
428 ):
429 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
430
431 # sync local dir
432 self.fs.sync(from_path=cluster_uuid)
433
434 # look for instance to obtain namespace
435 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
436 if not instance_info:
437 raise K8sException("kdu_instance {} not found".format(kdu_instance))
438
439 # init env, paths
440 paths, env = self._init_paths_env(
441 cluster_name=cluster_uuid, create_if_not_exist=True
442 )
443
444 # sync local dir
445 self.fs.sync(from_path=cluster_uuid)
446
447 # params to str
448 params_str, file_to_delete = self._params_to_file_option(
449 cluster_id=cluster_uuid, params=params
450 )
451
452 # version
453 kdu_model, version = self._split_version(kdu_model)
454
455 command = self._get_upgrade_command(
456 kdu_model,
457 kdu_instance,
458 instance_info["namespace"],
459 params_str,
460 version,
461 atomic,
462 timeout,
463 paths["kube_config"],
464 )
465
466 self.log.debug("upgrading: {}".format(command))
467
468 if atomic:
469
470 # exec helm in a task
471 exec_task = asyncio.ensure_future(
472 coro_or_future=self._local_async_exec(
473 command=command, raise_exception_on_error=False, env=env
474 )
475 )
476 # write status in another task
477 status_task = asyncio.ensure_future(
478 coro_or_future=self._store_status(
479 cluster_id=cluster_uuid,
480 kdu_instance=kdu_instance,
481 namespace=instance_info["namespace"],
482 db_dict=db_dict,
483 operation="upgrade",
484 run_once=False,
485 )
486 )
487
488 # wait for execution task
489 await asyncio.wait([exec_task])
490
491 # cancel status task
492 status_task.cancel()
493 output, rc = exec_task.result()
494
495 else:
496
497 output, rc = await self._local_async_exec(
498 command=command, raise_exception_on_error=False, env=env
499 )
500
501 # remove temporal values yaml file
502 if file_to_delete:
503 os.remove(file_to_delete)
504
505 # write final status
506 await self._store_status(
507 cluster_id=cluster_uuid,
508 kdu_instance=kdu_instance,
509 namespace=instance_info["namespace"],
510 db_dict=db_dict,
511 operation="upgrade",
512 run_once=True,
513 check_every=0,
514 )
515
516 if rc != 0:
517 msg = "Error executing command: {}\nOutput: {}".format(command, output)
518 self.log.error(msg)
519 raise K8sException(msg)
520
521 # sync fs
522 self.fs.reverse_sync(from_path=cluster_uuid)
523
524 # return new revision number
525 instance = await self.get_instance_info(
526 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
527 )
528 if instance:
529 revision = int(instance.get("revision"))
530 self.log.debug("New revision: {}".format(revision))
531 return revision
532 else:
533 return 0
534
535 async def scale(
536 self,
537 kdu_instance: str,
538 scale: int,
539 resource_name: str,
540 total_timeout: float = 1800,
541 cluster_uuid: str = None,
542 kdu_model: str = None,
543 atomic: bool = True,
544 db_dict: dict = None,
545 **kwargs,
546 ):
547 """Scale a resource in a Helm Chart.
548
549 Args:
550 kdu_instance: KDU instance name
551 scale: Scale to which to set the resource
552 resource_name: Resource name
553 total_timeout: The time, in seconds, to wait
554 cluster_uuid: The UUID of the cluster
555 kdu_model: The chart reference
556 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
557 The --wait flag will be set automatically if --atomic is used
558 db_dict: Dictionary for any additional data
559 kwargs: Additional parameters
560
561 Returns:
562 True if successful, False otherwise
563 """
564
565 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
566 if resource_name:
567 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
568 resource_name, kdu_model, cluster_uuid
569 )
570
571 self.log.debug(debug_mgs)
572
573 # look for instance to obtain namespace
574 # get_instance_info function calls the sync command
575 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
576 if not instance_info:
577 raise K8sException("kdu_instance {} not found".format(kdu_instance))
578
579 # init env, paths
580 paths, env = self._init_paths_env(
581 cluster_name=cluster_uuid, create_if_not_exist=True
582 )
583
584 # version
585 kdu_model, version = self._split_version(kdu_model)
586
587 repo_url = await self._find_repo(kdu_model, cluster_uuid)
588 if not repo_url:
589 raise K8sException(
590 "Repository not found for kdu_model {}".format(kdu_model)
591 )
592
593 _, replica_str = await self._get_replica_count_url(
594 kdu_model, repo_url, resource_name
595 )
596
597 command = self._get_upgrade_scale_command(
598 kdu_model,
599 kdu_instance,
600 instance_info["namespace"],
601 scale,
602 version,
603 atomic,
604 replica_str,
605 total_timeout,
606 resource_name,
607 paths["kube_config"],
608 )
609
610 self.log.debug("scaling: {}".format(command))
611
612 if atomic:
613 # exec helm in a task
614 exec_task = asyncio.ensure_future(
615 coro_or_future=self._local_async_exec(
616 command=command, raise_exception_on_error=False, env=env
617 )
618 )
619 # write status in another task
620 status_task = asyncio.ensure_future(
621 coro_or_future=self._store_status(
622 cluster_id=cluster_uuid,
623 kdu_instance=kdu_instance,
624 namespace=instance_info["namespace"],
625 db_dict=db_dict,
626 operation="scale",
627 run_once=False,
628 )
629 )
630
631 # wait for execution task
632 await asyncio.wait([exec_task])
633
634 # cancel status task
635 status_task.cancel()
636 output, rc = exec_task.result()
637
638 else:
639 output, rc = await self._local_async_exec(
640 command=command, raise_exception_on_error=False, env=env
641 )
642
643 # write final status
644 await self._store_status(
645 cluster_id=cluster_uuid,
646 kdu_instance=kdu_instance,
647 namespace=instance_info["namespace"],
648 db_dict=db_dict,
649 operation="scale",
650 run_once=True,
651 check_every=0,
652 )
653
654 if rc != 0:
655 msg = "Error executing command: {}\nOutput: {}".format(command, output)
656 self.log.error(msg)
657 raise K8sException(msg)
658
659 # sync fs
660 self.fs.reverse_sync(from_path=cluster_uuid)
661
662 return True
663
664 async def get_scale_count(
665 self,
666 resource_name: str,
667 kdu_instance: str,
668 cluster_uuid: str,
669 kdu_model: str,
670 **kwargs,
671 ) -> int:
672 """Get a resource scale count.
673
674 Args:
675 cluster_uuid: The UUID of the cluster
676 resource_name: Resource name
677 kdu_instance: KDU instance name
678 kdu_model: The name or path of a bundle
679 kwargs: Additional parameters
680
681 Returns:
682 Resource instance count
683 """
684
685 self.log.debug(
686 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
687 )
688
689 # look for instance to obtain namespace
690 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
691 if not instance_info:
692 raise K8sException("kdu_instance {} not found".format(kdu_instance))
693
694 # init env, paths
695 paths, env = self._init_paths_env(
696 cluster_name=cluster_uuid, create_if_not_exist=True
697 )
698
699 replicas = await self._get_replica_count_instance(
700 kdu_instance, instance_info["namespace"], paths["kube_config"]
701 )
702
703 # Get default value if scale count is not found from provided values
704 if not replicas:
705 repo_url = await self._find_repo(kdu_model, cluster_uuid)
706 if not repo_url:
707 raise K8sException(
708 "Repository not found for kdu_model {}".format(kdu_model)
709 )
710
711 replicas, _ = await self._get_replica_count_url(
712 kdu_model, repo_url, resource_name
713 )
714
715 if not replicas:
716 msg = "Replica count not found. Cannot be scaled"
717 self.log.error(msg)
718 raise K8sException(msg)
719
720 return int(replicas)
721
722 async def rollback(
723 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
724 ):
725 self.log.debug(
726 "rollback kdu_instance {} to revision {} from cluster {}".format(
727 kdu_instance, revision, cluster_uuid
728 )
729 )
730
731 # sync local dir
732 self.fs.sync(from_path=cluster_uuid)
733
734 # look for instance to obtain namespace
735 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
736 if not instance_info:
737 raise K8sException("kdu_instance {} not found".format(kdu_instance))
738
739 # init env, paths
740 paths, env = self._init_paths_env(
741 cluster_name=cluster_uuid, create_if_not_exist=True
742 )
743
744 # sync local dir
745 self.fs.sync(from_path=cluster_uuid)
746
747 command = self._get_rollback_command(
748 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
749 )
750
751 self.log.debug("rolling_back: {}".format(command))
752
753 # exec helm in a task
754 exec_task = asyncio.ensure_future(
755 coro_or_future=self._local_async_exec(
756 command=command, raise_exception_on_error=False, env=env
757 )
758 )
759 # write status in another task
760 status_task = asyncio.ensure_future(
761 coro_or_future=self._store_status(
762 cluster_id=cluster_uuid,
763 kdu_instance=kdu_instance,
764 namespace=instance_info["namespace"],
765 db_dict=db_dict,
766 operation="rollback",
767 run_once=False,
768 )
769 )
770
771 # wait for execution task
772 await asyncio.wait([exec_task])
773
774 # cancel status task
775 status_task.cancel()
776
777 output, rc = exec_task.result()
778
779 # write final status
780 await self._store_status(
781 cluster_id=cluster_uuid,
782 kdu_instance=kdu_instance,
783 namespace=instance_info["namespace"],
784 db_dict=db_dict,
785 operation="rollback",
786 run_once=True,
787 check_every=0,
788 )
789
790 if rc != 0:
791 msg = "Error executing command: {}\nOutput: {}".format(command, output)
792 self.log.error(msg)
793 raise K8sException(msg)
794
795 # sync fs
796 self.fs.reverse_sync(from_path=cluster_uuid)
797
798 # return new revision number
799 instance = await self.get_instance_info(
800 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
801 )
802 if instance:
803 revision = int(instance.get("revision"))
804 self.log.debug("New revision: {}".format(revision))
805 return revision
806 else:
807 return 0
808
809 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
810 """
811 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
812 (this call should happen after all _terminate-config-primitive_ of the VNF
813 are invoked).
814
815 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
816 :param kdu_instance: unique name for the KDU instance to be deleted
817 :param kwargs: Additional parameters (None yet)
818 :return: True if successful
819 """
820
821 self.log.debug(
822 "uninstall kdu_instance {} from cluster {}".format(
823 kdu_instance, cluster_uuid
824 )
825 )
826
827 # sync local dir
828 self.fs.sync(from_path=cluster_uuid)
829
830 # look for instance to obtain namespace
831 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
832 if not instance_info:
833 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
834 return True
835 # init env, paths
836 paths, env = self._init_paths_env(
837 cluster_name=cluster_uuid, create_if_not_exist=True
838 )
839
840 # sync local dir
841 self.fs.sync(from_path=cluster_uuid)
842
843 command = self._get_uninstall_command(
844 kdu_instance, instance_info["namespace"], paths["kube_config"]
845 )
846 output, _rc = await self._local_async_exec(
847 command=command, raise_exception_on_error=True, env=env
848 )
849
850 # sync fs
851 self.fs.reverse_sync(from_path=cluster_uuid)
852
853 return self._output_to_table(output)
854
855 async def instances_list(self, cluster_uuid: str) -> list:
856 """
857 returns a list of deployed releases in a cluster
858
859 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
860 :return:
861 """
862
863 self.log.debug("list releases for cluster {}".format(cluster_uuid))
864
865 # sync local dir
866 self.fs.sync(from_path=cluster_uuid)
867
868 # execute internal command
869 result = await self._instances_list(cluster_uuid)
870
871 # sync fs
872 self.fs.reverse_sync(from_path=cluster_uuid)
873
874 return result
875
876 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
877 instances = await self.instances_list(cluster_uuid=cluster_uuid)
878 for instance in instances:
879 if instance.get("name") == kdu_instance:
880 return instance
881 self.log.debug("Instance {} not found".format(kdu_instance))
882 return None
883
884 async def exec_primitive(
885 self,
886 cluster_uuid: str = None,
887 kdu_instance: str = None,
888 primitive_name: str = None,
889 timeout: float = 300,
890 params: dict = None,
891 db_dict: dict = None,
892 **kwargs,
893 ) -> str:
894 """Exec primitive (Juju action)
895
896 :param cluster_uuid: The UUID of the cluster or namespace:cluster
897 :param kdu_instance: The unique name of the KDU instance
898 :param primitive_name: Name of action that will be executed
899 :param timeout: Timeout for action execution
900 :param params: Dictionary of all the parameters needed for the action
901 :db_dict: Dictionary for any additional data
902 :param kwargs: Additional parameters (None yet)
903
904 :return: Returns the output of the action
905 """
906 raise K8sException(
907 "KDUs deployed with Helm don't support actions "
908 "different from rollback, upgrade and status"
909 )
910
911 async def get_services(
912 self, cluster_uuid: str, kdu_instance: str, namespace: str
913 ) -> list:
914 """
915 Returns a list of services defined for the specified kdu instance.
916
917 :param cluster_uuid: UUID of a K8s cluster known by OSM
918 :param kdu_instance: unique name for the KDU instance
919 :param namespace: K8s namespace used by the KDU instance
920 :return: If successful, it will return a list of services, Each service
921 can have the following data:
922 - `name` of the service
923 - `type` type of service in the k8 cluster
924 - `ports` List of ports offered by the service, for each port includes at least
925 name, port, protocol
926 - `cluster_ip` Internal ip to be used inside k8s cluster
927 - `external_ip` List of external ips (in case they are available)
928 """
929
930 self.log.debug(
931 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
932 cluster_uuid, kdu_instance
933 )
934 )
935
936 # init env, paths
937 paths, env = self._init_paths_env(
938 cluster_name=cluster_uuid, create_if_not_exist=True
939 )
940
941 # sync local dir
942 self.fs.sync(from_path=cluster_uuid)
943
944 # get list of services names for kdu
945 service_names = await self._get_services(
946 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
947 )
948
949 service_list = []
950 for service in service_names:
951 service = await self._get_service(cluster_uuid, service, namespace)
952 service_list.append(service)
953
954 # sync fs
955 self.fs.reverse_sync(from_path=cluster_uuid)
956
957 return service_list
958
959 async def get_service(
960 self, cluster_uuid: str, service_name: str, namespace: str
961 ) -> object:
962
963 self.log.debug(
964 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
965 service_name, namespace, cluster_uuid
966 )
967 )
968
969 # sync local dir
970 self.fs.sync(from_path=cluster_uuid)
971
972 service = await self._get_service(cluster_uuid, service_name, namespace)
973
974 # sync fs
975 self.fs.reverse_sync(from_path=cluster_uuid)
976
977 return service
978
979 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
980 """
981 This call would retrieve tha current state of a given KDU instance. It would be
982 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
983 values_ of the configuration parameters applied to a given instance. This call
984 would be based on the `status` call.
985
986 :param cluster_uuid: UUID of a K8s cluster known by OSM
987 :param kdu_instance: unique name for the KDU instance
988 :param kwargs: Additional parameters (None yet)
989 :return: If successful, it will return the following vector of arguments:
990 - K8s `namespace` in the cluster where the KDU lives
991 - `state` of the KDU instance. It can be:
992 - UNKNOWN
993 - DEPLOYED
994 - DELETED
995 - SUPERSEDED
996 - FAILED or
997 - DELETING
998 - List of `resources` (objects) that this release consists of, sorted by kind,
999 and the status of those resources
1000 - Last `deployment_time`.
1001
1002 """
1003 self.log.debug(
1004 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1005 cluster_uuid, kdu_instance
1006 )
1007 )
1008
1009 # sync local dir
1010 self.fs.sync(from_path=cluster_uuid)
1011
1012 # get instance: needed to obtain namespace
1013 instances = await self._instances_list(cluster_id=cluster_uuid)
1014 for instance in instances:
1015 if instance.get("name") == kdu_instance:
1016 break
1017 else:
1018 # instance does not exist
1019 raise K8sException(
1020 "Instance name: {} not found in cluster: {}".format(
1021 kdu_instance, cluster_uuid
1022 )
1023 )
1024
1025 status = await self._status_kdu(
1026 cluster_id=cluster_uuid,
1027 kdu_instance=kdu_instance,
1028 namespace=instance["namespace"],
1029 show_error_log=True,
1030 return_text=True,
1031 )
1032
1033 # sync fs
1034 self.fs.reverse_sync(from_path=cluster_uuid)
1035
1036 return status
1037
1038 async def get_values_kdu(
1039 self, kdu_instance: str, namespace: str, kubeconfig: str
1040 ) -> str:
1041
1042 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1043
1044 return await self._exec_get_command(
1045 get_command="values",
1046 kdu_instance=kdu_instance,
1047 namespace=namespace,
1048 kubeconfig=kubeconfig,
1049 )
1050
1051 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1052
1053 self.log.debug(
1054 "inspect kdu_model values {} from (optional) repo: {}".format(
1055 kdu_model, repo_url
1056 )
1057 )
1058
1059 return await self._exec_inspect_command(
1060 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1061 )
1062
1063 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1064
1065 self.log.debug(
1066 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1067 )
1068
1069 return await self._exec_inspect_command(
1070 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1071 )
1072
1073 async def synchronize_repos(self, cluster_uuid: str):
1074
1075 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1076 try:
1077 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1078 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1079
1080 local_repo_list = await self.repo_list(cluster_uuid)
1081 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1082
1083 deleted_repo_list = []
1084 added_repo_dict = {}
1085
1086 # iterate over the list of repos in the database that should be
1087 # added if not present
1088 for repo_name, db_repo in db_repo_dict.items():
1089 try:
1090 # check if it is already present
1091 curr_repo_url = local_repo_dict.get(db_repo["name"])
1092 repo_id = db_repo.get("_id")
1093 if curr_repo_url != db_repo["url"]:
1094 if curr_repo_url:
1095 self.log.debug(
1096 "repo {} url changed, delete and and again".format(
1097 db_repo["url"]
1098 )
1099 )
1100 await self.repo_remove(cluster_uuid, db_repo["name"])
1101 deleted_repo_list.append(repo_id)
1102
1103 # add repo
1104 self.log.debug("add repo {}".format(db_repo["name"]))
1105 await self.repo_add(
1106 cluster_uuid, db_repo["name"], db_repo["url"]
1107 )
1108 added_repo_dict[repo_id] = db_repo["name"]
1109 except Exception as e:
1110 raise K8sException(
1111 "Error adding repo id: {}, err_msg: {} ".format(
1112 repo_id, repr(e)
1113 )
1114 )
1115
1116 # Delete repos that are present but not in nbi_list
1117 for repo_name in local_repo_dict:
1118 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1119 self.log.debug("delete repo {}".format(repo_name))
1120 try:
1121 await self.repo_remove(cluster_uuid, repo_name)
1122 deleted_repo_list.append(repo_name)
1123 except Exception as e:
1124 self.warning(
1125 "Error deleting repo, name: {}, err_msg: {}".format(
1126 repo_name, str(e)
1127 )
1128 )
1129
1130 return deleted_repo_list, added_repo_dict
1131
1132 except K8sException:
1133 raise
1134 except Exception as e:
1135 # Do not raise errors synchronizing repos
1136 self.log.error("Error synchronizing repos: {}".format(e))
1137 raise Exception("Error synchronizing repos: {}".format(e))
1138
1139 def _get_db_repos_dict(self, repo_ids: list):
1140 db_repos_dict = {}
1141 for repo_id in repo_ids:
1142 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1143 db_repos_dict[db_repo["name"]] = db_repo
1144 return db_repos_dict
1145
1146 """
1147 ####################################################################################
1148 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1149 ####################################################################################
1150 """
1151
1152 @abc.abstractmethod
1153 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1154 """
1155 Creates and returns base cluster and kube dirs and returns them.
1156 Also created helm3 dirs according to new directory specification, paths are
1157 not returned but assigned to helm environment variables
1158
1159 :param cluster_name: cluster_name
1160 :return: Dictionary with config_paths and dictionary with helm environment variables
1161 """
1162
1163 @abc.abstractmethod
1164 async def _cluster_init(self, cluster_id, namespace, paths, env):
1165 """
1166 Implements the helm version dependent cluster initialization
1167 """
1168
1169 @abc.abstractmethod
1170 async def _instances_list(self, cluster_id):
1171 """
1172 Implements the helm version dependent helm instances list
1173 """
1174
1175 @abc.abstractmethod
1176 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1177 """
1178 Implements the helm version dependent method to obtain services from a helm instance
1179 """
1180
1181 @abc.abstractmethod
1182 async def _status_kdu(
1183 self,
1184 cluster_id: str,
1185 kdu_instance: str,
1186 namespace: str = None,
1187 show_error_log: bool = False,
1188 return_text: bool = False,
1189 ):
1190 """
1191 Implements the helm version dependent method to obtain status of a helm instance
1192 """
1193
1194 @abc.abstractmethod
1195 def _get_install_command(
1196 self,
1197 kdu_model,
1198 kdu_instance,
1199 namespace,
1200 params_str,
1201 version,
1202 atomic,
1203 timeout,
1204 kubeconfig,
1205 ) -> str:
1206 """
1207 Obtain command to be executed to delete the indicated instance
1208 """
1209
1210 @abc.abstractmethod
1211 def _get_upgrade_scale_command(
1212 self,
1213 kdu_model,
1214 kdu_instance,
1215 namespace,
1216 count,
1217 version,
1218 atomic,
1219 replicas,
1220 timeout,
1221 resource_name,
1222 kubeconfig,
1223 ) -> str:
1224 """Obtain command to be executed to upgrade the indicated instance."""
1225
1226 @abc.abstractmethod
1227 def _get_upgrade_command(
1228 self,
1229 kdu_model,
1230 kdu_instance,
1231 namespace,
1232 params_str,
1233 version,
1234 atomic,
1235 timeout,
1236 kubeconfig,
1237 ) -> str:
1238 """
1239 Obtain command to be executed to upgrade the indicated instance
1240 """
1241
1242 @abc.abstractmethod
1243 def _get_rollback_command(
1244 self, kdu_instance, namespace, revision, kubeconfig
1245 ) -> str:
1246 """
1247 Obtain command to be executed to rollback the indicated instance
1248 """
1249
1250 @abc.abstractmethod
1251 def _get_uninstall_command(
1252 self, kdu_instance: str, namespace: str, kubeconfig: str
1253 ) -> str:
1254 """
1255 Obtain command to be executed to delete the indicated instance
1256 """
1257
1258 @abc.abstractmethod
1259 def _get_inspect_command(
1260 self, show_command: str, kdu_model: str, repo_str: str, version: str
1261 ):
1262 """
1263 Obtain command to be executed to obtain information about the kdu
1264 """
1265
1266 @abc.abstractmethod
1267 def _get_get_command(
1268 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1269 ):
1270 """Obtain command to be executed to get information about the kdu instance."""
1271
1272 @abc.abstractmethod
1273 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1274 """
1275 Method call to uninstall cluster software for helm. This method is dependent
1276 of helm version
1277 For Helm v2 it will be called when Tiller must be uninstalled
1278 For Helm v3 it does nothing and does not need to be callled
1279 """
1280
1281 @abc.abstractmethod
1282 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1283 """
1284 Obtains the cluster repos identifiers
1285 """
1286
1287 """
1288 ####################################################################################
1289 ################################### P R I V A T E ##################################
1290 ####################################################################################
1291 """
1292
1293 @staticmethod
1294 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1295 if os.path.exists(filename):
1296 return True
1297 else:
1298 msg = "File {} does not exist".format(filename)
1299 if exception_if_not_exists:
1300 raise K8sException(msg)
1301
1302 @staticmethod
1303 def _remove_multiple_spaces(strobj):
1304 strobj = strobj.strip()
1305 while " " in strobj:
1306 strobj = strobj.replace(" ", " ")
1307 return strobj
1308
1309 @staticmethod
1310 def _output_to_lines(output: str) -> list:
1311 output_lines = list()
1312 lines = output.splitlines(keepends=False)
1313 for line in lines:
1314 line = line.strip()
1315 if len(line) > 0:
1316 output_lines.append(line)
1317 return output_lines
1318
1319 @staticmethod
1320 def _output_to_table(output: str) -> list:
1321 output_table = list()
1322 lines = output.splitlines(keepends=False)
1323 for line in lines:
1324 line = line.replace("\t", " ")
1325 line_list = list()
1326 output_table.append(line_list)
1327 cells = line.split(sep=" ")
1328 for cell in cells:
1329 cell = cell.strip()
1330 if len(cell) > 0:
1331 line_list.append(cell)
1332 return output_table
1333
1334 @staticmethod
1335 def _parse_services(output: str) -> list:
1336 lines = output.splitlines(keepends=False)
1337 services = []
1338 for line in lines:
1339 line = line.replace("\t", " ")
1340 cells = line.split(sep=" ")
1341 if len(cells) > 0 and cells[0].startswith("service/"):
1342 elems = cells[0].split(sep="/")
1343 if len(elems) > 1:
1344 services.append(elems[1])
1345 return services
1346
1347 @staticmethod
1348 def _get_deep(dictionary: dict, members: tuple):
1349 target = dictionary
1350 value = None
1351 try:
1352 for m in members:
1353 value = target.get(m)
1354 if not value:
1355 return None
1356 else:
1357 target = value
1358 except Exception:
1359 pass
1360 return value
1361
1362 # find key:value in several lines
1363 @staticmethod
1364 def _find_in_lines(p_lines: list, p_key: str) -> str:
1365 for line in p_lines:
1366 try:
1367 if line.startswith(p_key + ":"):
1368 parts = line.split(":")
1369 the_value = parts[1].strip()
1370 return the_value
1371 except Exception:
1372 # ignore it
1373 pass
1374 return None
1375
1376 @staticmethod
1377 def _lower_keys_list(input_list: list):
1378 """
1379 Transform the keys in a list of dictionaries to lower case and returns a new list
1380 of dictionaries
1381 """
1382 new_list = []
1383 if input_list:
1384 for dictionary in input_list:
1385 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1386 new_list.append(new_dict)
1387 return new_list
1388
1389 async def _local_async_exec(
1390 self,
1391 command: str,
1392 raise_exception_on_error: bool = False,
1393 show_error_log: bool = True,
1394 encode_utf8: bool = False,
1395 env: dict = None,
1396 ) -> (str, int):
1397
1398 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1399 self.log.debug(
1400 "Executing async local command: {}, env: {}".format(command, env)
1401 )
1402
1403 # split command
1404 command = shlex.split(command)
1405
1406 environ = os.environ.copy()
1407 if env:
1408 environ.update(env)
1409
1410 try:
1411 process = await asyncio.create_subprocess_exec(
1412 *command,
1413 stdout=asyncio.subprocess.PIPE,
1414 stderr=asyncio.subprocess.PIPE,
1415 env=environ,
1416 )
1417
1418 # wait for command terminate
1419 stdout, stderr = await process.communicate()
1420
1421 return_code = process.returncode
1422
1423 output = ""
1424 if stdout:
1425 output = stdout.decode("utf-8").strip()
1426 # output = stdout.decode()
1427 if stderr:
1428 output = stderr.decode("utf-8").strip()
1429 # output = stderr.decode()
1430
1431 if return_code != 0 and show_error_log:
1432 self.log.debug(
1433 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1434 )
1435 else:
1436 self.log.debug("Return code: {}".format(return_code))
1437
1438 if raise_exception_on_error and return_code != 0:
1439 raise K8sException(output)
1440
1441 if encode_utf8:
1442 output = output.encode("utf-8").strip()
1443 output = str(output).replace("\\n", "\n")
1444
1445 return output, return_code
1446
1447 except asyncio.CancelledError:
1448 raise
1449 except K8sException:
1450 raise
1451 except Exception as e:
1452 msg = "Exception executing command: {} -> {}".format(command, e)
1453 self.log.error(msg)
1454 if raise_exception_on_error:
1455 raise K8sException(e) from e
1456 else:
1457 return "", -1
1458
1459 async def _local_async_exec_pipe(
1460 self,
1461 command1: str,
1462 command2: str,
1463 raise_exception_on_error: bool = True,
1464 show_error_log: bool = True,
1465 encode_utf8: bool = False,
1466 env: dict = None,
1467 ):
1468
1469 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1470 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1471 command = "{} | {}".format(command1, command2)
1472 self.log.debug(
1473 "Executing async local command: {}, env: {}".format(command, env)
1474 )
1475
1476 # split command
1477 command1 = shlex.split(command1)
1478 command2 = shlex.split(command2)
1479
1480 environ = os.environ.copy()
1481 if env:
1482 environ.update(env)
1483
1484 try:
1485 read, write = os.pipe()
1486 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1487 os.close(write)
1488 process_2 = await asyncio.create_subprocess_exec(
1489 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1490 )
1491 os.close(read)
1492 stdout, stderr = await process_2.communicate()
1493
1494 return_code = process_2.returncode
1495
1496 output = ""
1497 if stdout:
1498 output = stdout.decode("utf-8").strip()
1499 # output = stdout.decode()
1500 if stderr:
1501 output = stderr.decode("utf-8").strip()
1502 # output = stderr.decode()
1503
1504 if return_code != 0 and show_error_log:
1505 self.log.debug(
1506 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1507 )
1508 else:
1509 self.log.debug("Return code: {}".format(return_code))
1510
1511 if raise_exception_on_error and return_code != 0:
1512 raise K8sException(output)
1513
1514 if encode_utf8:
1515 output = output.encode("utf-8").strip()
1516 output = str(output).replace("\\n", "\n")
1517
1518 return output, return_code
1519 except asyncio.CancelledError:
1520 raise
1521 except K8sException:
1522 raise
1523 except Exception as e:
1524 msg = "Exception executing command: {} -> {}".format(command, e)
1525 self.log.error(msg)
1526 if raise_exception_on_error:
1527 raise K8sException(e) from e
1528 else:
1529 return "", -1
1530
1531 async def _get_service(self, cluster_id, service_name, namespace):
1532 """
1533 Obtains the data of the specified service in the k8cluster.
1534
1535 :param cluster_id: id of a K8s cluster known by OSM
1536 :param service_name: name of the K8s service in the specified namespace
1537 :param namespace: K8s namespace used by the KDU instance
1538 :return: If successful, it will return a service with the following data:
1539 - `name` of the service
1540 - `type` type of service in the k8 cluster
1541 - `ports` List of ports offered by the service, for each port includes at least
1542 name, port, protocol
1543 - `cluster_ip` Internal ip to be used inside k8s cluster
1544 - `external_ip` List of external ips (in case they are available)
1545 """
1546
1547 # init config, env
1548 paths, env = self._init_paths_env(
1549 cluster_name=cluster_id, create_if_not_exist=True
1550 )
1551
1552 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1553 self.kubectl_command, paths["kube_config"], namespace, service_name
1554 )
1555
1556 output, _rc = await self._local_async_exec(
1557 command=command, raise_exception_on_error=True, env=env
1558 )
1559
1560 data = yaml.load(output, Loader=yaml.SafeLoader)
1561
1562 service = {
1563 "name": service_name,
1564 "type": self._get_deep(data, ("spec", "type")),
1565 "ports": self._get_deep(data, ("spec", "ports")),
1566 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1567 }
1568 if service["type"] == "LoadBalancer":
1569 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1570 ip_list = [elem["ip"] for elem in ip_map_list]
1571 service["external_ip"] = ip_list
1572
1573 return service
1574
1575 async def _exec_get_command(
1576 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1577 ):
1578 """Obtains information about the kdu instance."""
1579
1580 full_command = self._get_get_command(
1581 get_command, kdu_instance, namespace, kubeconfig
1582 )
1583
1584 output, _rc = await self._local_async_exec(command=full_command)
1585
1586 return output
1587
1588 async def _exec_inspect_command(
1589 self, inspect_command: str, kdu_model: str, repo_url: str = None
1590 ):
1591 """Obtains information about a kdu, no cluster (no env)."""
1592
1593 repo_str = ""
1594 if repo_url:
1595 repo_str = " --repo {}".format(repo_url)
1596
1597 idx = kdu_model.find("/")
1598 if idx >= 0:
1599 idx += 1
1600 kdu_model = kdu_model[idx:]
1601
1602 kdu_model, version = self._split_version(kdu_model)
1603 if version:
1604 version_str = "--version {}".format(version)
1605 else:
1606 version_str = ""
1607
1608 full_command = self._get_inspect_command(
1609 inspect_command, kdu_model, repo_str, version_str
1610 )
1611
1612 output, _rc = await self._local_async_exec(command=full_command)
1613
1614 return output
1615
1616 async def _get_replica_count_url(
1617 self,
1618 kdu_model: str,
1619 repo_url: str,
1620 resource_name: str = None,
1621 ):
1622 """Get the replica count value in the Helm Chart Values.
1623
1624 Args:
1625 kdu_model: The name or path of a bundle
1626 repo_url: Helm Chart repository url
1627 resource_name: Resource name
1628
1629 Returns:
1630 True if replicas, False replicaCount
1631 """
1632
1633 kdu_values = yaml.load(
1634 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1635 )
1636
1637 if not kdu_values:
1638 raise K8sException(
1639 "kdu_values not found for kdu_model {}".format(kdu_model)
1640 )
1641
1642 if resource_name:
1643 kdu_values = kdu_values.get(resource_name, None)
1644
1645 if not kdu_values:
1646 msg = "resource {} not found in the values in model {}".format(
1647 resource_name, kdu_model
1648 )
1649 self.log.error(msg)
1650 raise K8sException(msg)
1651
1652 duplicate_check = False
1653
1654 replica_str = ""
1655 replicas = None
1656
1657 if kdu_values.get("replicaCount", None):
1658 replicas = kdu_values["replicaCount"]
1659 replica_str = "replicaCount"
1660 elif kdu_values.get("replicas", None):
1661 duplicate_check = True
1662 replicas = kdu_values["replicas"]
1663 replica_str = "replicas"
1664 else:
1665 if resource_name:
1666 msg = (
1667 "replicaCount or replicas not found in the resource"
1668 "{} values in model {}. Cannot be scaled".format(
1669 resource_name, kdu_model
1670 )
1671 )
1672 else:
1673 msg = (
1674 "replicaCount or replicas not found in the values"
1675 "in model {}. Cannot be scaled".format(kdu_model)
1676 )
1677 self.log.error(msg)
1678 raise K8sException(msg)
1679
1680 # Control if replicas and replicaCount exists at the same time
1681 msg = "replicaCount and replicas are exists at the same time"
1682 if duplicate_check:
1683 if "replicaCount" in kdu_values:
1684 self.log.error(msg)
1685 raise K8sException(msg)
1686 else:
1687 if "replicas" in kdu_values:
1688 self.log.error(msg)
1689 raise K8sException(msg)
1690
1691 return replicas, replica_str
1692
1693 async def _get_replica_count_instance(
1694 self,
1695 kdu_instance: str,
1696 namespace: str,
1697 kubeconfig: str,
1698 resource_name: str = None,
1699 ):
1700 """Get the replica count value in the instance.
1701
1702 Args:
1703 kdu_instance: The name of the KDU instance
1704 namespace: KDU instance namespace
1705 kubeconfig:
1706 resource_name: Resource name
1707
1708 Returns:
1709 True if replicas, False replicaCount
1710 """
1711
1712 kdu_values = yaml.load(
1713 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1714 Loader=yaml.SafeLoader,
1715 )
1716
1717 replicas = None
1718
1719 if kdu_values:
1720 resource_values = (
1721 kdu_values.get(resource_name, None) if resource_name else None
1722 )
1723 replicas = (
1724 (
1725 resource_values.get("replicaCount", None)
1726 or resource_values.get("replicas", None)
1727 )
1728 if resource_values
1729 else (
1730 kdu_values.get("replicaCount", None)
1731 or kdu_values.get("replicas", None)
1732 )
1733 )
1734
1735 return replicas
1736
1737 async def _store_status(
1738 self,
1739 cluster_id: str,
1740 operation: str,
1741 kdu_instance: str,
1742 namespace: str = None,
1743 check_every: float = 10,
1744 db_dict: dict = None,
1745 run_once: bool = False,
1746 ):
1747 while True:
1748 try:
1749 await asyncio.sleep(check_every)
1750 detailed_status = await self._status_kdu(
1751 cluster_id=cluster_id,
1752 kdu_instance=kdu_instance,
1753 namespace=namespace,
1754 return_text=False,
1755 )
1756 status = detailed_status.get("info").get("description")
1757 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1758 # write status to db
1759 result = await self.write_app_status_to_db(
1760 db_dict=db_dict,
1761 status=str(status),
1762 detailed_status=str(detailed_status),
1763 operation=operation,
1764 )
1765 if not result:
1766 self.log.info("Error writing in database. Task exiting...")
1767 return
1768 except asyncio.CancelledError:
1769 self.log.debug("Task cancelled")
1770 return
1771 except Exception as e:
1772 self.log.debug(
1773 "_store_status exception: {}".format(str(e)), exc_info=True
1774 )
1775 pass
1776 finally:
1777 if run_once:
1778 return
1779
1780 # params for use in -f file
1781 # returns values file option and filename (in order to delete it at the end)
1782 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1783
1784 if params and len(params) > 0:
1785 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1786
1787 def get_random_number():
1788 r = random.randrange(start=1, stop=99999999)
1789 s = str(r)
1790 while len(s) < 10:
1791 s = "0" + s
1792 return s
1793
1794 params2 = dict()
1795 for key in params:
1796 value = params.get(key)
1797 if "!!yaml" in str(value):
1798 value = yaml.load(value[7:])
1799 params2[key] = value
1800
1801 values_file = get_random_number() + ".yaml"
1802 with open(values_file, "w") as stream:
1803 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1804
1805 return "-f {}".format(values_file), values_file
1806
1807 return "", None
1808
1809 # params for use in --set option
1810 @staticmethod
1811 def _params_to_set_option(params: dict) -> str:
1812 params_str = ""
1813 if params and len(params) > 0:
1814 start = True
1815 for key in params:
1816 value = params.get(key, None)
1817 if value is not None:
1818 if start:
1819 params_str += "--set "
1820 start = False
1821 else:
1822 params_str += ","
1823 params_str += "{}={}".format(key, value)
1824 return params_str
1825
1826 @staticmethod
1827 def generate_kdu_instance_name(**kwargs):
1828 chart_name = kwargs["kdu_model"]
1829 # check embeded chart (file or dir)
1830 if chart_name.startswith("/"):
1831 # extract file or directory name
1832 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1833 # check URL
1834 elif "://" in chart_name:
1835 # extract last portion of URL
1836 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1837
1838 name = ""
1839 for c in chart_name:
1840 if c.isalpha() or c.isnumeric():
1841 name += c
1842 else:
1843 name += "-"
1844 if len(name) > 35:
1845 name = name[0:35]
1846
1847 # if does not start with alpha character, prefix 'a'
1848 if not name[0].isalpha():
1849 name = "a" + name
1850
1851 name += "-"
1852
1853 def get_random_number():
1854 r = random.randrange(start=1, stop=99999999)
1855 s = str(r)
1856 s = s.rjust(10, "0")
1857 return s
1858
1859 name = name + get_random_number()
1860 return name.lower()
1861
1862 def _split_version(self, kdu_model: str) -> (str, str):
1863 version = None
1864 if ":" in kdu_model:
1865 parts = kdu_model.split(sep=":")
1866 if len(parts) == 2:
1867 version = str(parts[1])
1868 kdu_model = parts[0]
1869 return kdu_model, version
1870
1871 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1872 repo_url = None
1873 idx = kdu_model.find("/")
1874 if idx >= 0:
1875 repo_name = kdu_model[:idx]
1876 # Find repository link
1877 local_repo_list = await self.repo_list(cluster_uuid)
1878 for repo in local_repo_list:
1879 repo_url = repo["url"] if repo["name"] == repo_name else None
1880 return repo_url