273b206ddb13eb8027773db228e8f45b1884bc02
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 @staticmethod
93 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
94 """
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
97 """
98 namespace, _, cluster_id = cluster_uuid.rpartition(":")
99 return namespace, cluster_id
100
101 async def init_env(
102 self,
103 k8s_creds: str,
104 namespace: str = "kube-system",
105 reuse_cluster_uuid=None,
106 **kwargs,
107 ) -> (str, bool):
108 """
109 It prepares a given K8s cluster environment to run Charts
110
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
112 '.kube/config'
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
120 """
121
122 if reuse_cluster_uuid:
123 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
124 namespace = namespace_ or namespace
125 else:
126 cluster_id = str(uuid4())
127 cluster_uuid = "{}:{}".format(namespace, cluster_id)
128
129 self.log.debug(
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
131 )
132
133 paths, env = self._init_paths_env(
134 cluster_name=cluster_id, create_if_not_exist=True
135 )
136 mode = stat.S_IRUSR | stat.S_IWUSR
137 with open(paths["kube_config"], "w", mode) as f:
138 f.write(k8s_creds)
139 os.chmod(paths["kube_config"], 0o600)
140
141 # Code with initialization specific of helm version
142 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
143
144 # sync fs with local data
145 self.fs.reverse_sync(from_path=cluster_id)
146
147 self.log.info("Cluster {} initialized".format(cluster_id))
148
149 return cluster_uuid, n2vc_installed_sw
150
151 async def repo_add(
152 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
153 ):
154 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
155 self.log.debug(
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id, repo_type, name, url
158 )
159 )
160
161 # init_env
162 paths, env = self._init_paths_env(
163 cluster_name=cluster_id, create_if_not_exist=True
164 )
165
166 # sync local dir
167 self.fs.sync(from_path=cluster_id)
168
169 # helm repo update
170 command = "env KUBECONFIG={} {} repo update".format(
171 paths["kube_config"], self._helm_command
172 )
173 self.log.debug("updating repo: {}".format(command))
174 await self._local_async_exec(
175 command=command, raise_exception_on_error=False, env=env
176 )
177
178 # helm repo add name url
179 command = "env KUBECONFIG={} {} repo add {} {}".format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182 self.log.debug("adding repo: {}".format(command))
183 await self._local_async_exec(
184 command=command, raise_exception_on_error=True, env=env
185 )
186
187 # sync fs
188 self.fs.reverse_sync(from_path=cluster_id)
189
190 async def repo_list(self, cluster_uuid: str) -> list:
191 """
192 Get the list of registered repositories
193
194 :return: list of registered repositories: [ (name, url) .... ]
195 """
196
197 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
198 self.log.debug("list repositories for cluster {}".format(cluster_id))
199
200 # config filename
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_id, create_if_not_exist=True
203 )
204
205 # sync local dir
206 self.fs.sync(from_path=cluster_id)
207
208 command = "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths["kube_config"], self._helm_command
210 )
211
212 # Set exception to false because if there are no repos just want an empty list
213 output, _rc = await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
218 self.fs.reverse_sync(from_path=cluster_id)
219
220 if _rc == 0:
221 if output and len(output) > 0:
222 repos = yaml.load(output, Loader=yaml.SafeLoader)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self._lower_keys_list(repos)
225 else:
226 return []
227 else:
228 return []
229
230 async def repo_remove(self, cluster_uuid: str, name: str):
231
232 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
233 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
234
235 # init env, paths
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_id, create_if_not_exist=True
238 )
239
240 # sync local dir
241 self.fs.sync(from_path=cluster_id)
242
243 command = "env KUBECONFIG={} {} repo remove {}".format(
244 paths["kube_config"], self._helm_command, name
245 )
246 await self._local_async_exec(
247 command=command, raise_exception_on_error=True, env=env
248 )
249
250 # sync fs
251 self.fs.reverse_sync(from_path=cluster_id)
252
253 async def reset(
254 self,
255 cluster_uuid: str,
256 force: bool = False,
257 uninstall_sw: bool = False,
258 **kwargs,
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
263
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
269 """
270 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
271 self.log.debug(
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_id, uninstall_sw
274 )
275 )
276
277 # sync local dir
278 self.fs.sync(from_path=cluster_id)
279
280 # uninstall releases if needed.
281 if uninstall_sw:
282 releases = await self.instances_list(cluster_uuid=cluster_uuid)
283 if len(releases) > 0:
284 if force:
285 for r in releases:
286 try:
287 kdu_instance = r.get("name")
288 chart = r.get("chart")
289 self.log.debug(
290 "Uninstalling {} -> {}".format(chart, kdu_instance)
291 )
292 await self.uninstall(
293 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
294 )
295 except Exception as e:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
298 # raised an error
299 self.log.warn(
300 "Error uninstalling release {}: {}".format(
301 kdu_instance, e
302 )
303 )
304 else:
305 msg = (
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_id)
308 self.log.warn(msg)
309 uninstall_sw = (
310 False # Allow to remove k8s cluster without removing Tiller
311 )
312
313 if uninstall_sw:
314 await self._uninstall_sw(cluster_id, namespace)
315
316 # delete cluster directory
317 self.log.debug("Removing directory {}".format(cluster_id))
318 self.fs.file_delete(cluster_id, ignore_non_exist=True)
319 # Remove also local directorio if still exist
320 direct = self.fs.path + "/" + cluster_id
321 shutil.rmtree(direct, ignore_errors=True)
322
323 return True
324
325 async def _install_impl(
326 self,
327 cluster_id: str,
328 kdu_model: str,
329 paths: dict,
330 env: dict,
331 kdu_instance: str,
332 atomic: bool = True,
333 timeout: float = 300,
334 params: dict = None,
335 db_dict: dict = None,
336 kdu_name: str = None,
337 namespace: str = None,
338 ):
339 # init env, paths
340 paths, env = self._init_paths_env(
341 cluster_name=cluster_id, create_if_not_exist=True
342 )
343
344 # params to str
345 params_str, file_to_delete = self._params_to_file_option(
346 cluster_id=cluster_id, params=params
347 )
348
349 # version
350 kdu_model, version = self._split_version(kdu_model)
351
352 command = self._get_install_command(
353 kdu_model,
354 kdu_instance,
355 namespace,
356 params_str,
357 version,
358 atomic,
359 timeout,
360 paths["kube_config"],
361 )
362
363 self.log.debug("installing: {}".format(command))
364
365 if atomic:
366 # exec helm in a task
367 exec_task = asyncio.ensure_future(
368 coro_or_future=self._local_async_exec(
369 command=command, raise_exception_on_error=False, env=env
370 )
371 )
372
373 # write status in another task
374 status_task = asyncio.ensure_future(
375 coro_or_future=self._store_status(
376 cluster_id=cluster_id,
377 kdu_instance=kdu_instance,
378 namespace=namespace,
379 db_dict=db_dict,
380 operation="install",
381 run_once=False,
382 )
383 )
384
385 # wait for execution task
386 await asyncio.wait([exec_task])
387
388 # cancel status task
389 status_task.cancel()
390
391 output, rc = exec_task.result()
392
393 else:
394
395 output, rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=False, env=env
397 )
398
399 # remove temporal values yaml file
400 if file_to_delete:
401 os.remove(file_to_delete)
402
403 # write final status
404 await self._store_status(
405 cluster_id=cluster_id,
406 kdu_instance=kdu_instance,
407 namespace=namespace,
408 db_dict=db_dict,
409 operation="install",
410 run_once=True,
411 check_every=0,
412 )
413
414 if rc != 0:
415 msg = "Error executing command: {}\nOutput: {}".format(command, output)
416 self.log.error(msg)
417 raise K8sException(msg)
418
419 async def upgrade(
420 self,
421 cluster_uuid: str,
422 kdu_instance: str,
423 kdu_model: str = None,
424 atomic: bool = True,
425 timeout: float = 300,
426 params: dict = None,
427 db_dict: dict = None,
428 ):
429 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
430 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
431
432 # sync local dir
433 self.fs.sync(from_path=cluster_id)
434
435 # look for instance to obtain namespace
436 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
437 if not instance_info:
438 raise K8sException("kdu_instance {} not found".format(kdu_instance))
439
440 # init env, paths
441 paths, env = self._init_paths_env(
442 cluster_name=cluster_id, create_if_not_exist=True
443 )
444
445 # sync local dir
446 self.fs.sync(from_path=cluster_id)
447
448 # params to str
449 params_str, file_to_delete = self._params_to_file_option(
450 cluster_id=cluster_id, params=params
451 )
452
453 # version
454 kdu_model, version = self._split_version(kdu_model)
455
456 command = self._get_upgrade_command(
457 kdu_model,
458 kdu_instance,
459 instance_info["namespace"],
460 params_str,
461 version,
462 atomic,
463 timeout,
464 paths["kube_config"],
465 )
466
467 self.log.debug("upgrading: {}".format(command))
468
469 if atomic:
470
471 # exec helm in a task
472 exec_task = asyncio.ensure_future(
473 coro_or_future=self._local_async_exec(
474 command=command, raise_exception_on_error=False, env=env
475 )
476 )
477 # write status in another task
478 status_task = asyncio.ensure_future(
479 coro_or_future=self._store_status(
480 cluster_id=cluster_id,
481 kdu_instance=kdu_instance,
482 namespace=instance_info["namespace"],
483 db_dict=db_dict,
484 operation="upgrade",
485 run_once=False,
486 )
487 )
488
489 # wait for execution task
490 await asyncio.wait([exec_task])
491
492 # cancel status task
493 status_task.cancel()
494 output, rc = exec_task.result()
495
496 else:
497
498 output, rc = await self._local_async_exec(
499 command=command, raise_exception_on_error=False, env=env
500 )
501
502 # remove temporal values yaml file
503 if file_to_delete:
504 os.remove(file_to_delete)
505
506 # write final status
507 await self._store_status(
508 cluster_id=cluster_id,
509 kdu_instance=kdu_instance,
510 namespace=instance_info["namespace"],
511 db_dict=db_dict,
512 operation="upgrade",
513 run_once=True,
514 check_every=0,
515 )
516
517 if rc != 0:
518 msg = "Error executing command: {}\nOutput: {}".format(command, output)
519 self.log.error(msg)
520 raise K8sException(msg)
521
522 # sync fs
523 self.fs.reverse_sync(from_path=cluster_id)
524
525 # return new revision number
526 instance = await self.get_instance_info(
527 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
528 )
529 if instance:
530 revision = int(instance.get("revision"))
531 self.log.debug("New revision: {}".format(revision))
532 return revision
533 else:
534 return 0
535
536 async def scale(
537 self,
538 kdu_instance: str,
539 scale: int,
540 resource_name: str,
541 total_timeout: float = 1800,
542 cluster_uuid: str = None,
543 kdu_model: str = None,
544 atomic: bool = True,
545 db_dict: dict = None,
546 **kwargs,
547 ):
548 """Scale a resource in a Helm Chart.
549
550 Args:
551 kdu_instance: KDU instance name
552 scale: Scale to which to set the resource
553 resource_name: Resource name
554 total_timeout: The time, in seconds, to wait
555 cluster_uuid: The UUID of the cluster
556 kdu_model: The chart reference
557 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
558 The --wait flag will be set automatically if --atomic is used
559 db_dict: Dictionary for any additional data
560 kwargs: Additional parameters
561
562 Returns:
563 True if successful, False otherwise
564 """
565
566 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
567
568 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_id)
569 if resource_name:
570 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
571 resource_name, kdu_model, cluster_id
572 )
573
574 self.log.debug(debug_mgs)
575
576 # look for instance to obtain namespace
577 # get_instance_info function calls the sync command
578 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
579 if not instance_info:
580 raise K8sException("kdu_instance {} not found".format(kdu_instance))
581
582 # init env, paths
583 paths, env = self._init_paths_env(
584 cluster_name=cluster_id, create_if_not_exist=True
585 )
586
587 # version
588 kdu_model, version = self._split_version(kdu_model)
589
590 repo_url = await self._find_repo(kdu_model, cluster_uuid)
591 if not repo_url:
592 raise K8sException(
593 "Repository not found for kdu_model {}".format(kdu_model)
594 )
595
596 _, replica_str = await self._get_replica_count_url(
597 kdu_model, repo_url, resource_name
598 )
599
600 command = self._get_upgrade_scale_command(
601 kdu_model,
602 kdu_instance,
603 instance_info["namespace"],
604 scale,
605 version,
606 atomic,
607 replica_str,
608 total_timeout,
609 resource_name,
610 paths["kube_config"],
611 )
612
613 self.log.debug("scaling: {}".format(command))
614
615 if atomic:
616 # exec helm in a task
617 exec_task = asyncio.ensure_future(
618 coro_or_future=self._local_async_exec(
619 command=command, raise_exception_on_error=False, env=env
620 )
621 )
622 # write status in another task
623 status_task = asyncio.ensure_future(
624 coro_or_future=self._store_status(
625 cluster_id=cluster_id,
626 kdu_instance=kdu_instance,
627 namespace=instance_info["namespace"],
628 db_dict=db_dict,
629 operation="scale",
630 run_once=False,
631 )
632 )
633
634 # wait for execution task
635 await asyncio.wait([exec_task])
636
637 # cancel status task
638 status_task.cancel()
639 output, rc = exec_task.result()
640
641 else:
642 output, rc = await self._local_async_exec(
643 command=command, raise_exception_on_error=False, env=env
644 )
645
646 # write final status
647 await self._store_status(
648 cluster_id=cluster_id,
649 kdu_instance=kdu_instance,
650 namespace=instance_info["namespace"],
651 db_dict=db_dict,
652 operation="scale",
653 run_once=True,
654 check_every=0,
655 )
656
657 if rc != 0:
658 msg = "Error executing command: {}\nOutput: {}".format(command, output)
659 self.log.error(msg)
660 raise K8sException(msg)
661
662 # sync fs
663 self.fs.reverse_sync(from_path=cluster_id)
664
665 return True
666
667 async def get_scale_count(
668 self,
669 resource_name: str,
670 kdu_instance: str,
671 cluster_uuid: str,
672 kdu_model: str,
673 **kwargs,
674 ) -> int:
675 """Get a resource scale count.
676
677 Args:
678 cluster_uuid: The UUID of the cluster
679 resource_name: Resource name
680 kdu_instance: KDU instance name
681 kdu_model: The name or path of a bundle
682 kwargs: Additional parameters
683
684 Returns:
685 Resource instance count
686 """
687
688 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
689 self.log.debug(
690 "getting scale count for {} in cluster {}".format(kdu_model, cluster_id)
691 )
692
693 # look for instance to obtain namespace
694 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
695 if not instance_info:
696 raise K8sException("kdu_instance {} not found".format(kdu_instance))
697
698 # init env, paths
699 paths, env = self._init_paths_env(
700 cluster_name=cluster_id, create_if_not_exist=True
701 )
702
703 replicas = await self._get_replica_count_instance(
704 kdu_instance, instance_info["namespace"], paths["kube_config"]
705 )
706
707 # Get default value if scale count is not found from provided values
708 if not replicas:
709 repo_url = await self._find_repo(kdu_model, cluster_uuid)
710 if not repo_url:
711 raise K8sException(
712 "Repository not found for kdu_model {}".format(kdu_model)
713 )
714
715 replicas, _ = await self._get_replica_count_url(
716 kdu_model, repo_url, resource_name
717 )
718
719 if not replicas:
720 msg = "Replica count not found. Cannot be scaled"
721 self.log.error(msg)
722 raise K8sException(msg)
723
724 return int(replicas)
725
726 async def rollback(
727 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
728 ):
729
730 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
731 self.log.debug(
732 "rollback kdu_instance {} to revision {} from cluster {}".format(
733 kdu_instance, revision, cluster_id
734 )
735 )
736
737 # sync local dir
738 self.fs.sync(from_path=cluster_id)
739
740 # look for instance to obtain namespace
741 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
742 if not instance_info:
743 raise K8sException("kdu_instance {} not found".format(kdu_instance))
744
745 # init env, paths
746 paths, env = self._init_paths_env(
747 cluster_name=cluster_id, create_if_not_exist=True
748 )
749
750 # sync local dir
751 self.fs.sync(from_path=cluster_id)
752
753 command = self._get_rollback_command(
754 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
755 )
756
757 self.log.debug("rolling_back: {}".format(command))
758
759 # exec helm in a task
760 exec_task = asyncio.ensure_future(
761 coro_or_future=self._local_async_exec(
762 command=command, raise_exception_on_error=False, env=env
763 )
764 )
765 # write status in another task
766 status_task = asyncio.ensure_future(
767 coro_or_future=self._store_status(
768 cluster_id=cluster_id,
769 kdu_instance=kdu_instance,
770 namespace=instance_info["namespace"],
771 db_dict=db_dict,
772 operation="rollback",
773 run_once=False,
774 )
775 )
776
777 # wait for execution task
778 await asyncio.wait([exec_task])
779
780 # cancel status task
781 status_task.cancel()
782
783 output, rc = exec_task.result()
784
785 # write final status
786 await self._store_status(
787 cluster_id=cluster_id,
788 kdu_instance=kdu_instance,
789 namespace=instance_info["namespace"],
790 db_dict=db_dict,
791 operation="rollback",
792 run_once=True,
793 check_every=0,
794 )
795
796 if rc != 0:
797 msg = "Error executing command: {}\nOutput: {}".format(command, output)
798 self.log.error(msg)
799 raise K8sException(msg)
800
801 # sync fs
802 self.fs.reverse_sync(from_path=cluster_id)
803
804 # return new revision number
805 instance = await self.get_instance_info(
806 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
807 )
808 if instance:
809 revision = int(instance.get("revision"))
810 self.log.debug("New revision: {}".format(revision))
811 return revision
812 else:
813 return 0
814
815 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
816 """
817 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
818 (this call should happen after all _terminate-config-primitive_ of the VNF
819 are invoked).
820
821 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
822 :param kdu_instance: unique name for the KDU instance to be deleted
823 :param kwargs: Additional parameters (None yet)
824 :return: True if successful
825 """
826
827 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
828 self.log.debug(
829 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
830 )
831
832 # sync local dir
833 self.fs.sync(from_path=cluster_id)
834
835 # look for instance to obtain namespace
836 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
837 if not instance_info:
838 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
839 return True
840 # init env, paths
841 paths, env = self._init_paths_env(
842 cluster_name=cluster_id, create_if_not_exist=True
843 )
844
845 # sync local dir
846 self.fs.sync(from_path=cluster_id)
847
848 command = self._get_uninstall_command(
849 kdu_instance, instance_info["namespace"], paths["kube_config"]
850 )
851 output, _rc = await self._local_async_exec(
852 command=command, raise_exception_on_error=True, env=env
853 )
854
855 # sync fs
856 self.fs.reverse_sync(from_path=cluster_id)
857
858 return self._output_to_table(output)
859
860 async def instances_list(self, cluster_uuid: str) -> list:
861 """
862 returns a list of deployed releases in a cluster
863
864 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
865 :return:
866 """
867
868 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
869 self.log.debug("list releases for cluster {}".format(cluster_id))
870
871 # sync local dir
872 self.fs.sync(from_path=cluster_id)
873
874 # execute internal command
875 result = await self._instances_list(cluster_id)
876
877 # sync fs
878 self.fs.reverse_sync(from_path=cluster_id)
879
880 return result
881
882 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
883 instances = await self.instances_list(cluster_uuid=cluster_uuid)
884 for instance in instances:
885 if instance.get("name") == kdu_instance:
886 return instance
887 self.log.debug("Instance {} not found".format(kdu_instance))
888 return None
889
890 async def exec_primitive(
891 self,
892 cluster_uuid: str = None,
893 kdu_instance: str = None,
894 primitive_name: str = None,
895 timeout: float = 300,
896 params: dict = None,
897 db_dict: dict = None,
898 **kwargs,
899 ) -> str:
900 """Exec primitive (Juju action)
901
902 :param cluster_uuid: The UUID of the cluster or namespace:cluster
903 :param kdu_instance: The unique name of the KDU instance
904 :param primitive_name: Name of action that will be executed
905 :param timeout: Timeout for action execution
906 :param params: Dictionary of all the parameters needed for the action
907 :db_dict: Dictionary for any additional data
908 :param kwargs: Additional parameters (None yet)
909
910 :return: Returns the output of the action
911 """
912 raise K8sException(
913 "KDUs deployed with Helm don't support actions "
914 "different from rollback, upgrade and status"
915 )
916
917 async def get_services(
918 self, cluster_uuid: str, kdu_instance: str, namespace: str
919 ) -> list:
920 """
921 Returns a list of services defined for the specified kdu instance.
922
923 :param cluster_uuid: UUID of a K8s cluster known by OSM
924 :param kdu_instance: unique name for the KDU instance
925 :param namespace: K8s namespace used by the KDU instance
926 :return: If successful, it will return a list of services, Each service
927 can have the following data:
928 - `name` of the service
929 - `type` type of service in the k8 cluster
930 - `ports` List of ports offered by the service, for each port includes at least
931 name, port, protocol
932 - `cluster_ip` Internal ip to be used inside k8s cluster
933 - `external_ip` List of external ips (in case they are available)
934 """
935
936 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
937 self.log.debug(
938 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
939 cluster_uuid, kdu_instance
940 )
941 )
942
943 # init env, paths
944 paths, env = self._init_paths_env(
945 cluster_name=cluster_id, create_if_not_exist=True
946 )
947
948 # sync local dir
949 self.fs.sync(from_path=cluster_id)
950
951 # get list of services names for kdu
952 service_names = await self._get_services(
953 cluster_id, kdu_instance, namespace, paths["kube_config"]
954 )
955
956 service_list = []
957 for service in service_names:
958 service = await self._get_service(cluster_id, service, namespace)
959 service_list.append(service)
960
961 # sync fs
962 self.fs.reverse_sync(from_path=cluster_id)
963
964 return service_list
965
966 async def get_service(
967 self, cluster_uuid: str, service_name: str, namespace: str
968 ) -> object:
969
970 self.log.debug(
971 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
972 service_name, namespace, cluster_uuid
973 )
974 )
975
976 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
977
978 # sync local dir
979 self.fs.sync(from_path=cluster_id)
980
981 service = await self._get_service(cluster_id, service_name, namespace)
982
983 # sync fs
984 self.fs.reverse_sync(from_path=cluster_id)
985
986 return service
987
988 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
989 """
990 This call would retrieve tha current state of a given KDU instance. It would be
991 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
992 values_ of the configuration parameters applied to a given instance. This call
993 would be based on the `status` call.
994
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param kwargs: Additional parameters (None yet)
998 :return: If successful, it will return the following vector of arguments:
999 - K8s `namespace` in the cluster where the KDU lives
1000 - `state` of the KDU instance. It can be:
1001 - UNKNOWN
1002 - DEPLOYED
1003 - DELETED
1004 - SUPERSEDED
1005 - FAILED or
1006 - DELETING
1007 - List of `resources` (objects) that this release consists of, sorted by kind,
1008 and the status of those resources
1009 - Last `deployment_time`.
1010
1011 """
1012 self.log.debug(
1013 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1014 cluster_uuid, kdu_instance
1015 )
1016 )
1017
1018 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
1019
1020 # sync local dir
1021 self.fs.sync(from_path=cluster_id)
1022
1023 # get instance: needed to obtain namespace
1024 instances = await self._instances_list(cluster_id=cluster_id)
1025 for instance in instances:
1026 if instance.get("name") == kdu_instance:
1027 break
1028 else:
1029 # instance does not exist
1030 raise K8sException(
1031 "Instance name: {} not found in cluster: {}".format(
1032 kdu_instance, cluster_id
1033 )
1034 )
1035
1036 status = await self._status_kdu(
1037 cluster_id=cluster_id,
1038 kdu_instance=kdu_instance,
1039 namespace=instance["namespace"],
1040 show_error_log=True,
1041 return_text=True,
1042 )
1043
1044 # sync fs
1045 self.fs.reverse_sync(from_path=cluster_id)
1046
1047 return status
1048
1049 async def get_values_kdu(
1050 self, kdu_instance: str, namespace: str, kubeconfig: str
1051 ) -> str:
1052
1053 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1054
1055 return await self._exec_get_command(
1056 get_command="values",
1057 kdu_instance=kdu_instance,
1058 namespace=namespace,
1059 kubeconfig=kubeconfig,
1060 )
1061
1062 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1063
1064 self.log.debug(
1065 "inspect kdu_model values {} from (optional) repo: {}".format(
1066 kdu_model, repo_url
1067 )
1068 )
1069
1070 return await self._exec_inspect_command(
1071 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1072 )
1073
1074 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1075
1076 self.log.debug(
1077 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1078 )
1079
1080 return await self._exec_inspect_command(
1081 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1082 )
1083
1084 async def synchronize_repos(self, cluster_uuid: str):
1085
1086 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1087 try:
1088 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1089 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1090
1091 local_repo_list = await self.repo_list(cluster_uuid)
1092 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1093
1094 deleted_repo_list = []
1095 added_repo_dict = {}
1096
1097 # iterate over the list of repos in the database that should be
1098 # added if not present
1099 for repo_name, db_repo in db_repo_dict.items():
1100 try:
1101 # check if it is already present
1102 curr_repo_url = local_repo_dict.get(db_repo["name"])
1103 repo_id = db_repo.get("_id")
1104 if curr_repo_url != db_repo["url"]:
1105 if curr_repo_url:
1106 self.log.debug(
1107 "repo {} url changed, delete and and again".format(
1108 db_repo["url"]
1109 )
1110 )
1111 await self.repo_remove(cluster_uuid, db_repo["name"])
1112 deleted_repo_list.append(repo_id)
1113
1114 # add repo
1115 self.log.debug("add repo {}".format(db_repo["name"]))
1116 await self.repo_add(
1117 cluster_uuid, db_repo["name"], db_repo["url"]
1118 )
1119 added_repo_dict[repo_id] = db_repo["name"]
1120 except Exception as e:
1121 raise K8sException(
1122 "Error adding repo id: {}, err_msg: {} ".format(
1123 repo_id, repr(e)
1124 )
1125 )
1126
1127 # Delete repos that are present but not in nbi_list
1128 for repo_name in local_repo_dict:
1129 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1130 self.log.debug("delete repo {}".format(repo_name))
1131 try:
1132 await self.repo_remove(cluster_uuid, repo_name)
1133 deleted_repo_list.append(repo_name)
1134 except Exception as e:
1135 self.warning(
1136 "Error deleting repo, name: {}, err_msg: {}".format(
1137 repo_name, str(e)
1138 )
1139 )
1140
1141 return deleted_repo_list, added_repo_dict
1142
1143 except K8sException:
1144 raise
1145 except Exception as e:
1146 # Do not raise errors synchronizing repos
1147 self.log.error("Error synchronizing repos: {}".format(e))
1148 raise Exception("Error synchronizing repos: {}".format(e))
1149
1150 def _get_db_repos_dict(self, repo_ids: list):
1151 db_repos_dict = {}
1152 for repo_id in repo_ids:
1153 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1154 db_repos_dict[db_repo["name"]] = db_repo
1155 return db_repos_dict
1156
1157 """
1158 ####################################################################################
1159 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1160 ####################################################################################
1161 """
1162
1163 @abc.abstractmethod
1164 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1165 """
1166 Creates and returns base cluster and kube dirs and returns them.
1167 Also created helm3 dirs according to new directory specification, paths are
1168 not returned but assigned to helm environment variables
1169
1170 :param cluster_name: cluster_name
1171 :return: Dictionary with config_paths and dictionary with helm environment variables
1172 """
1173
1174 @abc.abstractmethod
1175 async def _cluster_init(self, cluster_id, namespace, paths, env):
1176 """
1177 Implements the helm version dependent cluster initialization
1178 """
1179
1180 @abc.abstractmethod
1181 async def _instances_list(self, cluster_id):
1182 """
1183 Implements the helm version dependent helm instances list
1184 """
1185
1186 @abc.abstractmethod
1187 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1188 """
1189 Implements the helm version dependent method to obtain services from a helm instance
1190 """
1191
1192 @abc.abstractmethod
1193 async def _status_kdu(
1194 self,
1195 cluster_id: str,
1196 kdu_instance: str,
1197 namespace: str = None,
1198 show_error_log: bool = False,
1199 return_text: bool = False,
1200 ):
1201 """
1202 Implements the helm version dependent method to obtain status of a helm instance
1203 """
1204
1205 @abc.abstractmethod
1206 def _get_install_command(
1207 self,
1208 kdu_model,
1209 kdu_instance,
1210 namespace,
1211 params_str,
1212 version,
1213 atomic,
1214 timeout,
1215 kubeconfig,
1216 ) -> str:
1217 """
1218 Obtain command to be executed to delete the indicated instance
1219 """
1220
1221 @abc.abstractmethod
1222 def _get_upgrade_scale_command(
1223 self,
1224 kdu_model,
1225 kdu_instance,
1226 namespace,
1227 count,
1228 version,
1229 atomic,
1230 replicas,
1231 timeout,
1232 resource_name,
1233 kubeconfig,
1234 ) -> str:
1235 """Obtain command to be executed to upgrade the indicated instance."""
1236
1237 @abc.abstractmethod
1238 def _get_upgrade_command(
1239 self,
1240 kdu_model,
1241 kdu_instance,
1242 namespace,
1243 params_str,
1244 version,
1245 atomic,
1246 timeout,
1247 kubeconfig,
1248 ) -> str:
1249 """
1250 Obtain command to be executed to upgrade the indicated instance
1251 """
1252
1253 @abc.abstractmethod
1254 def _get_rollback_command(
1255 self, kdu_instance, namespace, revision, kubeconfig
1256 ) -> str:
1257 """
1258 Obtain command to be executed to rollback the indicated instance
1259 """
1260
1261 @abc.abstractmethod
1262 def _get_uninstall_command(
1263 self, kdu_instance: str, namespace: str, kubeconfig: str
1264 ) -> str:
1265 """
1266 Obtain command to be executed to delete the indicated instance
1267 """
1268
1269 @abc.abstractmethod
1270 def _get_inspect_command(
1271 self, show_command: str, kdu_model: str, repo_str: str, version: str
1272 ):
1273 """
1274 Obtain command to be executed to obtain information about the kdu
1275 """
1276
1277 @abc.abstractmethod
1278 def _get_get_command(
1279 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1280 ):
1281 """Obtain command to be executed to get information about the kdu instance."""
1282
1283 @abc.abstractmethod
1284 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1285 """
1286 Method call to uninstall cluster software for helm. This method is dependent
1287 of helm version
1288 For Helm v2 it will be called when Tiller must be uninstalled
1289 For Helm v3 it does nothing and does not need to be callled
1290 """
1291
1292 @abc.abstractmethod
1293 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1294 """
1295 Obtains the cluster repos identifiers
1296 """
1297
1298 """
1299 ####################################################################################
1300 ################################### P R I V A T E ##################################
1301 ####################################################################################
1302 """
1303
1304 @staticmethod
1305 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1306 if os.path.exists(filename):
1307 return True
1308 else:
1309 msg = "File {} does not exist".format(filename)
1310 if exception_if_not_exists:
1311 raise K8sException(msg)
1312
1313 @staticmethod
1314 def _remove_multiple_spaces(strobj):
1315 strobj = strobj.strip()
1316 while " " in strobj:
1317 strobj = strobj.replace(" ", " ")
1318 return strobj
1319
1320 @staticmethod
1321 def _output_to_lines(output: str) -> list:
1322 output_lines = list()
1323 lines = output.splitlines(keepends=False)
1324 for line in lines:
1325 line = line.strip()
1326 if len(line) > 0:
1327 output_lines.append(line)
1328 return output_lines
1329
1330 @staticmethod
1331 def _output_to_table(output: str) -> list:
1332 output_table = list()
1333 lines = output.splitlines(keepends=False)
1334 for line in lines:
1335 line = line.replace("\t", " ")
1336 line_list = list()
1337 output_table.append(line_list)
1338 cells = line.split(sep=" ")
1339 for cell in cells:
1340 cell = cell.strip()
1341 if len(cell) > 0:
1342 line_list.append(cell)
1343 return output_table
1344
1345 @staticmethod
1346 def _parse_services(output: str) -> list:
1347 lines = output.splitlines(keepends=False)
1348 services = []
1349 for line in lines:
1350 line = line.replace("\t", " ")
1351 cells = line.split(sep=" ")
1352 if len(cells) > 0 and cells[0].startswith("service/"):
1353 elems = cells[0].split(sep="/")
1354 if len(elems) > 1:
1355 services.append(elems[1])
1356 return services
1357
1358 @staticmethod
1359 def _get_deep(dictionary: dict, members: tuple):
1360 target = dictionary
1361 value = None
1362 try:
1363 for m in members:
1364 value = target.get(m)
1365 if not value:
1366 return None
1367 else:
1368 target = value
1369 except Exception:
1370 pass
1371 return value
1372
1373 # find key:value in several lines
1374 @staticmethod
1375 def _find_in_lines(p_lines: list, p_key: str) -> str:
1376 for line in p_lines:
1377 try:
1378 if line.startswith(p_key + ":"):
1379 parts = line.split(":")
1380 the_value = parts[1].strip()
1381 return the_value
1382 except Exception:
1383 # ignore it
1384 pass
1385 return None
1386
1387 @staticmethod
1388 def _lower_keys_list(input_list: list):
1389 """
1390 Transform the keys in a list of dictionaries to lower case and returns a new list
1391 of dictionaries
1392 """
1393 new_list = []
1394 if input_list:
1395 for dictionary in input_list:
1396 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1397 new_list.append(new_dict)
1398 return new_list
1399
1400 async def _local_async_exec(
1401 self,
1402 command: str,
1403 raise_exception_on_error: bool = False,
1404 show_error_log: bool = True,
1405 encode_utf8: bool = False,
1406 env: dict = None,
1407 ) -> (str, int):
1408
1409 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1410 self.log.debug(
1411 "Executing async local command: {}, env: {}".format(command, env)
1412 )
1413
1414 # split command
1415 command = shlex.split(command)
1416
1417 environ = os.environ.copy()
1418 if env:
1419 environ.update(env)
1420
1421 try:
1422 process = await asyncio.create_subprocess_exec(
1423 *command,
1424 stdout=asyncio.subprocess.PIPE,
1425 stderr=asyncio.subprocess.PIPE,
1426 env=environ,
1427 )
1428
1429 # wait for command terminate
1430 stdout, stderr = await process.communicate()
1431
1432 return_code = process.returncode
1433
1434 output = ""
1435 if stdout:
1436 output = stdout.decode("utf-8").strip()
1437 # output = stdout.decode()
1438 if stderr:
1439 output = stderr.decode("utf-8").strip()
1440 # output = stderr.decode()
1441
1442 if return_code != 0 and show_error_log:
1443 self.log.debug(
1444 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1445 )
1446 else:
1447 self.log.debug("Return code: {}".format(return_code))
1448
1449 if raise_exception_on_error and return_code != 0:
1450 raise K8sException(output)
1451
1452 if encode_utf8:
1453 output = output.encode("utf-8").strip()
1454 output = str(output).replace("\\n", "\n")
1455
1456 return output, return_code
1457
1458 except asyncio.CancelledError:
1459 raise
1460 except K8sException:
1461 raise
1462 except Exception as e:
1463 msg = "Exception executing command: {} -> {}".format(command, e)
1464 self.log.error(msg)
1465 if raise_exception_on_error:
1466 raise K8sException(e) from e
1467 else:
1468 return "", -1
1469
1470 async def _local_async_exec_pipe(
1471 self,
1472 command1: str,
1473 command2: str,
1474 raise_exception_on_error: bool = True,
1475 show_error_log: bool = True,
1476 encode_utf8: bool = False,
1477 env: dict = None,
1478 ):
1479
1480 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1481 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1482 command = "{} | {}".format(command1, command2)
1483 self.log.debug(
1484 "Executing async local command: {}, env: {}".format(command, env)
1485 )
1486
1487 # split command
1488 command1 = shlex.split(command1)
1489 command2 = shlex.split(command2)
1490
1491 environ = os.environ.copy()
1492 if env:
1493 environ.update(env)
1494
1495 try:
1496 read, write = os.pipe()
1497 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1498 os.close(write)
1499 process_2 = await asyncio.create_subprocess_exec(
1500 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1501 )
1502 os.close(read)
1503 stdout, stderr = await process_2.communicate()
1504
1505 return_code = process_2.returncode
1506
1507 output = ""
1508 if stdout:
1509 output = stdout.decode("utf-8").strip()
1510 # output = stdout.decode()
1511 if stderr:
1512 output = stderr.decode("utf-8").strip()
1513 # output = stderr.decode()
1514
1515 if return_code != 0 and show_error_log:
1516 self.log.debug(
1517 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1518 )
1519 else:
1520 self.log.debug("Return code: {}".format(return_code))
1521
1522 if raise_exception_on_error and return_code != 0:
1523 raise K8sException(output)
1524
1525 if encode_utf8:
1526 output = output.encode("utf-8").strip()
1527 output = str(output).replace("\\n", "\n")
1528
1529 return output, return_code
1530 except asyncio.CancelledError:
1531 raise
1532 except K8sException:
1533 raise
1534 except Exception as e:
1535 msg = "Exception executing command: {} -> {}".format(command, e)
1536 self.log.error(msg)
1537 if raise_exception_on_error:
1538 raise K8sException(e) from e
1539 else:
1540 return "", -1
1541
1542 async def _get_service(self, cluster_id, service_name, namespace):
1543 """
1544 Obtains the data of the specified service in the k8cluster.
1545
1546 :param cluster_id: id of a K8s cluster known by OSM
1547 :param service_name: name of the K8s service in the specified namespace
1548 :param namespace: K8s namespace used by the KDU instance
1549 :return: If successful, it will return a service with the following data:
1550 - `name` of the service
1551 - `type` type of service in the k8 cluster
1552 - `ports` List of ports offered by the service, for each port includes at least
1553 name, port, protocol
1554 - `cluster_ip` Internal ip to be used inside k8s cluster
1555 - `external_ip` List of external ips (in case they are available)
1556 """
1557
1558 # init config, env
1559 paths, env = self._init_paths_env(
1560 cluster_name=cluster_id, create_if_not_exist=True
1561 )
1562
1563 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1564 self.kubectl_command, paths["kube_config"], namespace, service_name
1565 )
1566
1567 output, _rc = await self._local_async_exec(
1568 command=command, raise_exception_on_error=True, env=env
1569 )
1570
1571 data = yaml.load(output, Loader=yaml.SafeLoader)
1572
1573 service = {
1574 "name": service_name,
1575 "type": self._get_deep(data, ("spec", "type")),
1576 "ports": self._get_deep(data, ("spec", "ports")),
1577 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1578 }
1579 if service["type"] == "LoadBalancer":
1580 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1581 ip_list = [elem["ip"] for elem in ip_map_list]
1582 service["external_ip"] = ip_list
1583
1584 return service
1585
1586 async def _exec_get_command(
1587 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1588 ):
1589 """Obtains information about the kdu instance."""
1590
1591 full_command = self._get_get_command(
1592 get_command, kdu_instance, namespace, kubeconfig
1593 )
1594
1595 output, _rc = await self._local_async_exec(command=full_command)
1596
1597 return output
1598
1599 async def _exec_inspect_command(
1600 self, inspect_command: str, kdu_model: str, repo_url: str = None
1601 ):
1602 """Obtains information about a kdu, no cluster (no env)."""
1603
1604 repo_str = ""
1605 if repo_url:
1606 repo_str = " --repo {}".format(repo_url)
1607
1608 idx = kdu_model.find("/")
1609 if idx >= 0:
1610 idx += 1
1611 kdu_model = kdu_model[idx:]
1612
1613 kdu_model, version = self._split_version(kdu_model)
1614 if version:
1615 version_str = "--version {}".format(version)
1616 else:
1617 version_str = ""
1618
1619 full_command = self._get_inspect_command(
1620 inspect_command, kdu_model, repo_str, version_str
1621 )
1622
1623 output, _rc = await self._local_async_exec(command=full_command)
1624
1625 return output
1626
1627 async def _get_replica_count_url(
1628 self,
1629 kdu_model: str,
1630 repo_url: str,
1631 resource_name: str = None,
1632 ):
1633 """Get the replica count value in the Helm Chart Values.
1634
1635 Args:
1636 kdu_model: The name or path of a bundle
1637 repo_url: Helm Chart repository url
1638 resource_name: Resource name
1639
1640 Returns:
1641 True if replicas, False replicaCount
1642 """
1643
1644 kdu_values = yaml.load(
1645 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1646 )
1647
1648 if not kdu_values:
1649 raise K8sException(
1650 "kdu_values not found for kdu_model {}".format(kdu_model)
1651 )
1652
1653 if resource_name:
1654 kdu_values = kdu_values.get(resource_name, None)
1655
1656 if not kdu_values:
1657 msg = "resource {} not found in the values in model {}".format(
1658 resource_name, kdu_model
1659 )
1660 self.log.error(msg)
1661 raise K8sException(msg)
1662
1663 duplicate_check = False
1664
1665 replica_str = ""
1666 replicas = None
1667
1668 if kdu_values.get("replicaCount", None):
1669 replicas = kdu_values["replicaCount"]
1670 replica_str = "replicaCount"
1671 elif kdu_values.get("replicas", None):
1672 duplicate_check = True
1673 replicas = kdu_values["replicas"]
1674 replica_str = "replicas"
1675 else:
1676 if resource_name:
1677 msg = (
1678 "replicaCount or replicas not found in the resource"
1679 "{} values in model {}. Cannot be scaled".format(
1680 resource_name, kdu_model
1681 )
1682 )
1683 else:
1684 msg = (
1685 "replicaCount or replicas not found in the values"
1686 "in model {}. Cannot be scaled".format(kdu_model)
1687 )
1688 self.log.error(msg)
1689 raise K8sException(msg)
1690
1691 # Control if replicas and replicaCount exists at the same time
1692 msg = "replicaCount and replicas are exists at the same time"
1693 if duplicate_check:
1694 if "replicaCount" in kdu_values:
1695 self.log.error(msg)
1696 raise K8sException(msg)
1697 else:
1698 if "replicas" in kdu_values:
1699 self.log.error(msg)
1700 raise K8sException(msg)
1701
1702 return replicas, replica_str
1703
1704 async def _get_replica_count_instance(
1705 self,
1706 kdu_instance: str,
1707 namespace: str,
1708 kubeconfig: str,
1709 resource_name: str = None,
1710 ):
1711 """Get the replica count value in the instance.
1712
1713 Args:
1714 kdu_instance: The name of the KDU instance
1715 namespace: KDU instance namespace
1716 kubeconfig:
1717 resource_name: Resource name
1718
1719 Returns:
1720 True if replicas, False replicaCount
1721 """
1722
1723 kdu_values = yaml.load(
1724 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1725 Loader=yaml.SafeLoader,
1726 )
1727
1728 replicas = None
1729
1730 if kdu_values:
1731 resource_values = (
1732 kdu_values.get(resource_name, None) if resource_name else None
1733 )
1734 replicas = (
1735 (
1736 resource_values.get("replicaCount", None)
1737 or resource_values.get("replicas", None)
1738 )
1739 if resource_values
1740 else (
1741 kdu_values.get("replicaCount", None)
1742 or kdu_values.get("replicas", None)
1743 )
1744 )
1745
1746 return replicas
1747
1748 async def _store_status(
1749 self,
1750 cluster_id: str,
1751 operation: str,
1752 kdu_instance: str,
1753 namespace: str = None,
1754 check_every: float = 10,
1755 db_dict: dict = None,
1756 run_once: bool = False,
1757 ):
1758 while True:
1759 try:
1760 await asyncio.sleep(check_every)
1761 detailed_status = await self._status_kdu(
1762 cluster_id=cluster_id,
1763 kdu_instance=kdu_instance,
1764 namespace=namespace,
1765 return_text=False,
1766 )
1767 status = detailed_status.get("info").get("description")
1768 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1769 # write status to db
1770 result = await self.write_app_status_to_db(
1771 db_dict=db_dict,
1772 status=str(status),
1773 detailed_status=str(detailed_status),
1774 operation=operation,
1775 )
1776 if not result:
1777 self.log.info("Error writing in database. Task exiting...")
1778 return
1779 except asyncio.CancelledError:
1780 self.log.debug("Task cancelled")
1781 return
1782 except Exception as e:
1783 self.log.debug(
1784 "_store_status exception: {}".format(str(e)), exc_info=True
1785 )
1786 pass
1787 finally:
1788 if run_once:
1789 return
1790
1791 # params for use in -f file
1792 # returns values file option and filename (in order to delete it at the end)
1793 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1794
1795 if params and len(params) > 0:
1796 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1797
1798 def get_random_number():
1799 r = random.randrange(start=1, stop=99999999)
1800 s = str(r)
1801 while len(s) < 10:
1802 s = "0" + s
1803 return s
1804
1805 params2 = dict()
1806 for key in params:
1807 value = params.get(key)
1808 if "!!yaml" in str(value):
1809 value = yaml.load(value[7:])
1810 params2[key] = value
1811
1812 values_file = get_random_number() + ".yaml"
1813 with open(values_file, "w") as stream:
1814 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1815
1816 return "-f {}".format(values_file), values_file
1817
1818 return "", None
1819
1820 # params for use in --set option
1821 @staticmethod
1822 def _params_to_set_option(params: dict) -> str:
1823 params_str = ""
1824 if params and len(params) > 0:
1825 start = True
1826 for key in params:
1827 value = params.get(key, None)
1828 if value is not None:
1829 if start:
1830 params_str += "--set "
1831 start = False
1832 else:
1833 params_str += ","
1834 params_str += "{}={}".format(key, value)
1835 return params_str
1836
1837 @staticmethod
1838 def generate_kdu_instance_name(**kwargs):
1839 chart_name = kwargs["kdu_model"]
1840 # check embeded chart (file or dir)
1841 if chart_name.startswith("/"):
1842 # extract file or directory name
1843 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1844 # check URL
1845 elif "://" in chart_name:
1846 # extract last portion of URL
1847 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1848
1849 name = ""
1850 for c in chart_name:
1851 if c.isalpha() or c.isnumeric():
1852 name += c
1853 else:
1854 name += "-"
1855 if len(name) > 35:
1856 name = name[0:35]
1857
1858 # if does not start with alpha character, prefix 'a'
1859 if not name[0].isalpha():
1860 name = "a" + name
1861
1862 name += "-"
1863
1864 def get_random_number():
1865 r = random.randrange(start=1, stop=99999999)
1866 s = str(r)
1867 s = s.rjust(10, "0")
1868 return s
1869
1870 name = name + get_random_number()
1871 return name.lower()
1872
1873 def _split_version(self, kdu_model: str) -> (str, str):
1874 version = None
1875 if ":" in kdu_model:
1876 parts = kdu_model.split(sep=":")
1877 if len(parts) == 2:
1878 version = str(parts[1])
1879 kdu_model = parts[0]
1880 return kdu_model, version
1881
1882 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1883 repo_url = None
1884 idx = kdu_model.find("/")
1885 if idx >= 0:
1886 repo_name = kdu_model[:idx]
1887 # Find repository link
1888 local_repo_list = await self.repo_list(cluster_uuid)
1889 for repo in local_repo_list:
1890 repo_url = repo["url"] if repo["name"] == repo_name else None
1891 return repo_url