feature: helm charts repos with certs
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 def _get_namespace(self, cluster_uuid: str) -> str:
93 """
94 Obtains the namespace used by the cluster with the uuid passed by argument
95
96 param: cluster_uuid: cluster's uuid
97 """
98
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster = self.db.get_one(
101 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
102 )
103 return k8scluster.get("namespace")
104
105 async def init_env(
106 self,
107 k8s_creds: str,
108 namespace: str = "kube-system",
109 reuse_cluster_uuid=None,
110 **kwargs,
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts
114
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
116 '.kube/config'
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125
126 if reuse_cluster_uuid:
127 cluster_id = reuse_cluster_uuid
128 else:
129 cluster_id = str(uuid4())
130
131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
151 return cluster_id, n2vc_installed_sw
152
153 async def repo_add(
154 self,
155 cluster_uuid: str,
156 name: str,
157 url: str,
158 repo_type: str = "chart",
159 cert: str = None,
160 user: str = None,
161 password: str = None,
162 ):
163 self.log.debug(
164 "Cluster {}, adding {} repository {}. URL: {}".format(
165 cluster_uuid, repo_type, name, url
166 )
167 )
168
169 # init_env
170 paths, env = self._init_paths_env(
171 cluster_name=cluster_uuid, create_if_not_exist=True
172 )
173
174 # sync local dir
175 self.fs.sync(from_path=cluster_uuid)
176
177 # helm repo update
178 command = "env KUBECONFIG={} {} repo update".format(
179 paths["kube_config"], self._helm_command
180 )
181 self.log.debug("updating repo: {}".format(command))
182 await self._local_async_exec(
183 command=command, raise_exception_on_error=False, env=env
184 )
185
186 # helm repo add name url
187 command = ("env KUBECONFIG={} {} repo add {} {}").format(
188 paths["kube_config"], self._helm_command, name, url
189 )
190
191 if cert:
192 temp_cert_file = os.path.join(
193 self.fs.path, "{}/helmcerts/".format(cluster_id), "temp.crt"
194 )
195 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
196 with open(temp_cert_file, "w") as the_cert:
197 the_cert.write(cert)
198 command += " --ca-file {}".format(temp_cert_file)
199
200 if user:
201 command += " --username={}".format(user)
202
203 if password:
204 command += " --password={}".format(password)
205
206 self.log.debug("adding repo: {}".format(command))
207 await self._local_async_exec(
208 command=command, raise_exception_on_error=True, env=env
209 )
210
211 # sync fs
212 self.fs.reverse_sync(from_path=cluster_uuid)
213
214 async def repo_list(self, cluster_uuid: str) -> list:
215 """
216 Get the list of registered repositories
217
218 :return: list of registered repositories: [ (name, url) .... ]
219 """
220
221 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
222
223 # config filename
224 paths, env = self._init_paths_env(
225 cluster_name=cluster_uuid, create_if_not_exist=True
226 )
227
228 # sync local dir
229 self.fs.sync(from_path=cluster_uuid)
230
231 command = "env KUBECONFIG={} {} repo list --output yaml".format(
232 paths["kube_config"], self._helm_command
233 )
234
235 # Set exception to false because if there are no repos just want an empty list
236 output, _rc = await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
243 if _rc == 0:
244 if output and len(output) > 0:
245 repos = yaml.load(output, Loader=yaml.SafeLoader)
246 # unify format between helm2 and helm3 setting all keys lowercase
247 return self._lower_keys_list(repos)
248 else:
249 return []
250 else:
251 return []
252
253 async def repo_remove(self, cluster_uuid: str, name: str):
254 self.log.debug(
255 "remove {} repositories for cluster {}".format(name, cluster_uuid)
256 )
257
258 # init env, paths
259 paths, env = self._init_paths_env(
260 cluster_name=cluster_uuid, create_if_not_exist=True
261 )
262
263 # sync local dir
264 self.fs.sync(from_path=cluster_uuid)
265
266 command = "env KUBECONFIG={} {} repo remove {}".format(
267 paths["kube_config"], self._helm_command, name
268 )
269 await self._local_async_exec(
270 command=command, raise_exception_on_error=True, env=env
271 )
272
273 # sync fs
274 self.fs.reverse_sync(from_path=cluster_uuid)
275
276 async def reset(
277 self,
278 cluster_uuid: str,
279 force: bool = False,
280 uninstall_sw: bool = False,
281 **kwargs,
282 ) -> bool:
283 """Reset a cluster
284
285 Resets the Kubernetes cluster by removing the helm deployment that represents it.
286
287 :param cluster_uuid: The UUID of the cluster to reset
288 :param force: Boolean to force the reset
289 :param uninstall_sw: Boolean to force the reset
290 :param kwargs: Additional parameters (None yet)
291 :return: Returns True if successful or raises an exception.
292 """
293 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
294 self.log.debug(
295 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
296 cluster_uuid, uninstall_sw
297 )
298 )
299
300 # sync local dir
301 self.fs.sync(from_path=cluster_uuid)
302
303 # uninstall releases if needed.
304 if uninstall_sw:
305 releases = await self.instances_list(cluster_uuid=cluster_uuid)
306 if len(releases) > 0:
307 if force:
308 for r in releases:
309 try:
310 kdu_instance = r.get("name")
311 chart = r.get("chart")
312 self.log.debug(
313 "Uninstalling {} -> {}".format(chart, kdu_instance)
314 )
315 await self.uninstall(
316 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
317 )
318 except Exception as e:
319 # will not raise exception as it was found
320 # that in some cases of previously installed helm releases it
321 # raised an error
322 self.log.warn(
323 "Error uninstalling release {}: {}".format(
324 kdu_instance, e
325 )
326 )
327 else:
328 msg = (
329 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
330 ).format(cluster_uuid)
331 self.log.warn(msg)
332 uninstall_sw = (
333 False # Allow to remove k8s cluster without removing Tiller
334 )
335
336 if uninstall_sw:
337 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
338
339 # delete cluster directory
340 self.log.debug("Removing directory {}".format(cluster_uuid))
341 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
342 # Remove also local directorio if still exist
343 direct = self.fs.path + "/" + cluster_uuid
344 shutil.rmtree(direct, ignore_errors=True)
345
346 return True
347
348 async def _install_impl(
349 self,
350 cluster_id: str,
351 kdu_model: str,
352 paths: dict,
353 env: dict,
354 kdu_instance: str,
355 atomic: bool = True,
356 timeout: float = 300,
357 params: dict = None,
358 db_dict: dict = None,
359 kdu_name: str = None,
360 namespace: str = None,
361 ):
362 # init env, paths
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
367 # params to str
368 params_str, file_to_delete = self._params_to_file_option(
369 cluster_id=cluster_id, params=params
370 )
371
372 # version
373 kdu_model, version = self._split_version(kdu_model)
374
375 command = self._get_install_command(
376 kdu_model,
377 kdu_instance,
378 namespace,
379 params_str,
380 version,
381 atomic,
382 timeout,
383 paths["kube_config"],
384 )
385
386 self.log.debug("installing: {}".format(command))
387
388 if atomic:
389 # exec helm in a task
390 exec_task = asyncio.ensure_future(
391 coro_or_future=self._local_async_exec(
392 command=command, raise_exception_on_error=False, env=env
393 )
394 )
395
396 # write status in another task
397 status_task = asyncio.ensure_future(
398 coro_or_future=self._store_status(
399 cluster_id=cluster_id,
400 kdu_instance=kdu_instance,
401 namespace=namespace,
402 db_dict=db_dict,
403 operation="install",
404 run_once=False,
405 )
406 )
407
408 # wait for execution task
409 await asyncio.wait([exec_task])
410
411 # cancel status task
412 status_task.cancel()
413
414 output, rc = exec_task.result()
415
416 else:
417
418 output, rc = await self._local_async_exec(
419 command=command, raise_exception_on_error=False, env=env
420 )
421
422 # remove temporal values yaml file
423 if file_to_delete:
424 os.remove(file_to_delete)
425
426 # write final status
427 await self._store_status(
428 cluster_id=cluster_id,
429 kdu_instance=kdu_instance,
430 namespace=namespace,
431 db_dict=db_dict,
432 operation="install",
433 run_once=True,
434 check_every=0,
435 )
436
437 if rc != 0:
438 msg = "Error executing command: {}\nOutput: {}".format(command, output)
439 self.log.error(msg)
440 raise K8sException(msg)
441
442 async def upgrade(
443 self,
444 cluster_uuid: str,
445 kdu_instance: str,
446 kdu_model: str = None,
447 atomic: bool = True,
448 timeout: float = 300,
449 params: dict = None,
450 db_dict: dict = None,
451 ):
452 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
453
454 # sync local dir
455 self.fs.sync(from_path=cluster_uuid)
456
457 # look for instance to obtain namespace
458 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
459 if not instance_info:
460 raise K8sException("kdu_instance {} not found".format(kdu_instance))
461
462 # init env, paths
463 paths, env = self._init_paths_env(
464 cluster_name=cluster_uuid, create_if_not_exist=True
465 )
466
467 # sync local dir
468 self.fs.sync(from_path=cluster_uuid)
469
470 # params to str
471 params_str, file_to_delete = self._params_to_file_option(
472 cluster_id=cluster_uuid, params=params
473 )
474
475 # version
476 kdu_model, version = self._split_version(kdu_model)
477
478 command = self._get_upgrade_command(
479 kdu_model,
480 kdu_instance,
481 instance_info["namespace"],
482 params_str,
483 version,
484 atomic,
485 timeout,
486 paths["kube_config"],
487 )
488
489 self.log.debug("upgrading: {}".format(command))
490
491 if atomic:
492
493 # exec helm in a task
494 exec_task = asyncio.ensure_future(
495 coro_or_future=self._local_async_exec(
496 command=command, raise_exception_on_error=False, env=env
497 )
498 )
499 # write status in another task
500 status_task = asyncio.ensure_future(
501 coro_or_future=self._store_status(
502 cluster_id=cluster_uuid,
503 kdu_instance=kdu_instance,
504 namespace=instance_info["namespace"],
505 db_dict=db_dict,
506 operation="upgrade",
507 run_once=False,
508 )
509 )
510
511 # wait for execution task
512 await asyncio.wait([exec_task])
513
514 # cancel status task
515 status_task.cancel()
516 output, rc = exec_task.result()
517
518 else:
519
520 output, rc = await self._local_async_exec(
521 command=command, raise_exception_on_error=False, env=env
522 )
523
524 # remove temporal values yaml file
525 if file_to_delete:
526 os.remove(file_to_delete)
527
528 # write final status
529 await self._store_status(
530 cluster_id=cluster_uuid,
531 kdu_instance=kdu_instance,
532 namespace=instance_info["namespace"],
533 db_dict=db_dict,
534 operation="upgrade",
535 run_once=True,
536 check_every=0,
537 )
538
539 if rc != 0:
540 msg = "Error executing command: {}\nOutput: {}".format(command, output)
541 self.log.error(msg)
542 raise K8sException(msg)
543
544 # sync fs
545 self.fs.reverse_sync(from_path=cluster_uuid)
546
547 # return new revision number
548 instance = await self.get_instance_info(
549 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
550 )
551 if instance:
552 revision = int(instance.get("revision"))
553 self.log.debug("New revision: {}".format(revision))
554 return revision
555 else:
556 return 0
557
558 async def scale(
559 self,
560 kdu_instance: str,
561 scale: int,
562 resource_name: str,
563 total_timeout: float = 1800,
564 cluster_uuid: str = None,
565 kdu_model: str = None,
566 atomic: bool = True,
567 db_dict: dict = None,
568 **kwargs,
569 ):
570 """Scale a resource in a Helm Chart.
571
572 Args:
573 kdu_instance: KDU instance name
574 scale: Scale to which to set the resource
575 resource_name: Resource name
576 total_timeout: The time, in seconds, to wait
577 cluster_uuid: The UUID of the cluster
578 kdu_model: The chart reference
579 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
580 The --wait flag will be set automatically if --atomic is used
581 db_dict: Dictionary for any additional data
582 kwargs: Additional parameters
583
584 Returns:
585 True if successful, False otherwise
586 """
587
588 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
589 if resource_name:
590 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
591 resource_name, kdu_model, cluster_uuid
592 )
593
594 self.log.debug(debug_mgs)
595
596 # look for instance to obtain namespace
597 # get_instance_info function calls the sync command
598 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
599 if not instance_info:
600 raise K8sException("kdu_instance {} not found".format(kdu_instance))
601
602 # init env, paths
603 paths, env = self._init_paths_env(
604 cluster_name=cluster_uuid, create_if_not_exist=True
605 )
606
607 # version
608 kdu_model, version = self._split_version(kdu_model)
609
610 repo_url = await self._find_repo(kdu_model, cluster_uuid)
611 if not repo_url:
612 raise K8sException(
613 "Repository not found for kdu_model {}".format(kdu_model)
614 )
615
616 _, replica_str = await self._get_replica_count_url(
617 kdu_model, repo_url, resource_name
618 )
619
620 command = self._get_upgrade_scale_command(
621 kdu_model,
622 kdu_instance,
623 instance_info["namespace"],
624 scale,
625 version,
626 atomic,
627 replica_str,
628 total_timeout,
629 resource_name,
630 paths["kube_config"],
631 )
632
633 self.log.debug("scaling: {}".format(command))
634
635 if atomic:
636 # exec helm in a task
637 exec_task = asyncio.ensure_future(
638 coro_or_future=self._local_async_exec(
639 command=command, raise_exception_on_error=False, env=env
640 )
641 )
642 # write status in another task
643 status_task = asyncio.ensure_future(
644 coro_or_future=self._store_status(
645 cluster_id=cluster_uuid,
646 kdu_instance=kdu_instance,
647 namespace=instance_info["namespace"],
648 db_dict=db_dict,
649 operation="scale",
650 run_once=False,
651 )
652 )
653
654 # wait for execution task
655 await asyncio.wait([exec_task])
656
657 # cancel status task
658 status_task.cancel()
659 output, rc = exec_task.result()
660
661 else:
662 output, rc = await self._local_async_exec(
663 command=command, raise_exception_on_error=False, env=env
664 )
665
666 # write final status
667 await self._store_status(
668 cluster_id=cluster_uuid,
669 kdu_instance=kdu_instance,
670 namespace=instance_info["namespace"],
671 db_dict=db_dict,
672 operation="scale",
673 run_once=True,
674 check_every=0,
675 )
676
677 if rc != 0:
678 msg = "Error executing command: {}\nOutput: {}".format(command, output)
679 self.log.error(msg)
680 raise K8sException(msg)
681
682 # sync fs
683 self.fs.reverse_sync(from_path=cluster_uuid)
684
685 return True
686
687 async def get_scale_count(
688 self,
689 resource_name: str,
690 kdu_instance: str,
691 cluster_uuid: str,
692 kdu_model: str,
693 **kwargs,
694 ) -> int:
695 """Get a resource scale count.
696
697 Args:
698 cluster_uuid: The UUID of the cluster
699 resource_name: Resource name
700 kdu_instance: KDU instance name
701 kdu_model: The name or path of a bundle
702 kwargs: Additional parameters
703
704 Returns:
705 Resource instance count
706 """
707
708 self.log.debug(
709 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
710 )
711
712 # look for instance to obtain namespace
713 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
714 if not instance_info:
715 raise K8sException("kdu_instance {} not found".format(kdu_instance))
716
717 # init env, paths
718 paths, env = self._init_paths_env(
719 cluster_name=cluster_uuid, create_if_not_exist=True
720 )
721
722 replicas = await self._get_replica_count_instance(
723 kdu_instance, instance_info["namespace"], paths["kube_config"]
724 )
725
726 # Get default value if scale count is not found from provided values
727 if not replicas:
728 repo_url = await self._find_repo(kdu_model, cluster_uuid)
729 if not repo_url:
730 raise K8sException(
731 "Repository not found for kdu_model {}".format(kdu_model)
732 )
733
734 replicas, _ = await self._get_replica_count_url(
735 kdu_model, repo_url, resource_name
736 )
737
738 if not replicas:
739 msg = "Replica count not found. Cannot be scaled"
740 self.log.error(msg)
741 raise K8sException(msg)
742
743 return int(replicas)
744
745 async def rollback(
746 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
747 ):
748 self.log.debug(
749 "rollback kdu_instance {} to revision {} from cluster {}".format(
750 kdu_instance, revision, cluster_uuid
751 )
752 )
753
754 # sync local dir
755 self.fs.sync(from_path=cluster_uuid)
756
757 # look for instance to obtain namespace
758 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
759 if not instance_info:
760 raise K8sException("kdu_instance {} not found".format(kdu_instance))
761
762 # init env, paths
763 paths, env = self._init_paths_env(
764 cluster_name=cluster_uuid, create_if_not_exist=True
765 )
766
767 # sync local dir
768 self.fs.sync(from_path=cluster_uuid)
769
770 command = self._get_rollback_command(
771 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
772 )
773
774 self.log.debug("rolling_back: {}".format(command))
775
776 # exec helm in a task
777 exec_task = asyncio.ensure_future(
778 coro_or_future=self._local_async_exec(
779 command=command, raise_exception_on_error=False, env=env
780 )
781 )
782 # write status in another task
783 status_task = asyncio.ensure_future(
784 coro_or_future=self._store_status(
785 cluster_id=cluster_uuid,
786 kdu_instance=kdu_instance,
787 namespace=instance_info["namespace"],
788 db_dict=db_dict,
789 operation="rollback",
790 run_once=False,
791 )
792 )
793
794 # wait for execution task
795 await asyncio.wait([exec_task])
796
797 # cancel status task
798 status_task.cancel()
799
800 output, rc = exec_task.result()
801
802 # write final status
803 await self._store_status(
804 cluster_id=cluster_uuid,
805 kdu_instance=kdu_instance,
806 namespace=instance_info["namespace"],
807 db_dict=db_dict,
808 operation="rollback",
809 run_once=True,
810 check_every=0,
811 )
812
813 if rc != 0:
814 msg = "Error executing command: {}\nOutput: {}".format(command, output)
815 self.log.error(msg)
816 raise K8sException(msg)
817
818 # sync fs
819 self.fs.reverse_sync(from_path=cluster_uuid)
820
821 # return new revision number
822 instance = await self.get_instance_info(
823 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
824 )
825 if instance:
826 revision = int(instance.get("revision"))
827 self.log.debug("New revision: {}".format(revision))
828 return revision
829 else:
830 return 0
831
832 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
833 """
834 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
835 (this call should happen after all _terminate-config-primitive_ of the VNF
836 are invoked).
837
838 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
839 :param kdu_instance: unique name for the KDU instance to be deleted
840 :param kwargs: Additional parameters (None yet)
841 :return: True if successful
842 """
843
844 self.log.debug(
845 "uninstall kdu_instance {} from cluster {}".format(
846 kdu_instance, cluster_uuid
847 )
848 )
849
850 # sync local dir
851 self.fs.sync(from_path=cluster_uuid)
852
853 # look for instance to obtain namespace
854 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
855 if not instance_info:
856 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
857 return True
858 # init env, paths
859 paths, env = self._init_paths_env(
860 cluster_name=cluster_uuid, create_if_not_exist=True
861 )
862
863 # sync local dir
864 self.fs.sync(from_path=cluster_uuid)
865
866 command = self._get_uninstall_command(
867 kdu_instance, instance_info["namespace"], paths["kube_config"]
868 )
869 output, _rc = await self._local_async_exec(
870 command=command, raise_exception_on_error=True, env=env
871 )
872
873 # sync fs
874 self.fs.reverse_sync(from_path=cluster_uuid)
875
876 return self._output_to_table(output)
877
878 async def instances_list(self, cluster_uuid: str) -> list:
879 """
880 returns a list of deployed releases in a cluster
881
882 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
883 :return:
884 """
885
886 self.log.debug("list releases for cluster {}".format(cluster_uuid))
887
888 # sync local dir
889 self.fs.sync(from_path=cluster_uuid)
890
891 # execute internal command
892 result = await self._instances_list(cluster_uuid)
893
894 # sync fs
895 self.fs.reverse_sync(from_path=cluster_uuid)
896
897 return result
898
899 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
900 instances = await self.instances_list(cluster_uuid=cluster_uuid)
901 for instance in instances:
902 if instance.get("name") == kdu_instance:
903 return instance
904 self.log.debug("Instance {} not found".format(kdu_instance))
905 return None
906
907 async def exec_primitive(
908 self,
909 cluster_uuid: str = None,
910 kdu_instance: str = None,
911 primitive_name: str = None,
912 timeout: float = 300,
913 params: dict = None,
914 db_dict: dict = None,
915 **kwargs,
916 ) -> str:
917 """Exec primitive (Juju action)
918
919 :param cluster_uuid: The UUID of the cluster or namespace:cluster
920 :param kdu_instance: The unique name of the KDU instance
921 :param primitive_name: Name of action that will be executed
922 :param timeout: Timeout for action execution
923 :param params: Dictionary of all the parameters needed for the action
924 :db_dict: Dictionary for any additional data
925 :param kwargs: Additional parameters (None yet)
926
927 :return: Returns the output of the action
928 """
929 raise K8sException(
930 "KDUs deployed with Helm don't support actions "
931 "different from rollback, upgrade and status"
932 )
933
934 async def get_services(
935 self, cluster_uuid: str, kdu_instance: str, namespace: str
936 ) -> list:
937 """
938 Returns a list of services defined for the specified kdu instance.
939
940 :param cluster_uuid: UUID of a K8s cluster known by OSM
941 :param kdu_instance: unique name for the KDU instance
942 :param namespace: K8s namespace used by the KDU instance
943 :return: If successful, it will return a list of services, Each service
944 can have the following data:
945 - `name` of the service
946 - `type` type of service in the k8 cluster
947 - `ports` List of ports offered by the service, for each port includes at least
948 name, port, protocol
949 - `cluster_ip` Internal ip to be used inside k8s cluster
950 - `external_ip` List of external ips (in case they are available)
951 """
952
953 self.log.debug(
954 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
955 cluster_uuid, kdu_instance
956 )
957 )
958
959 # init env, paths
960 paths, env = self._init_paths_env(
961 cluster_name=cluster_uuid, create_if_not_exist=True
962 )
963
964 # sync local dir
965 self.fs.sync(from_path=cluster_uuid)
966
967 # get list of services names for kdu
968 service_names = await self._get_services(
969 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
970 )
971
972 service_list = []
973 for service in service_names:
974 service = await self._get_service(cluster_uuid, service, namespace)
975 service_list.append(service)
976
977 # sync fs
978 self.fs.reverse_sync(from_path=cluster_uuid)
979
980 return service_list
981
982 async def get_service(
983 self, cluster_uuid: str, service_name: str, namespace: str
984 ) -> object:
985
986 self.log.debug(
987 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
988 service_name, namespace, cluster_uuid
989 )
990 )
991
992 # sync local dir
993 self.fs.sync(from_path=cluster_uuid)
994
995 service = await self._get_service(cluster_uuid, service_name, namespace)
996
997 # sync fs
998 self.fs.reverse_sync(from_path=cluster_uuid)
999
1000 return service
1001
1002 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
1003 """
1004 This call would retrieve tha current state of a given KDU instance. It would be
1005 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1006 values_ of the configuration parameters applied to a given instance. This call
1007 would be based on the `status` call.
1008
1009 :param cluster_uuid: UUID of a K8s cluster known by OSM
1010 :param kdu_instance: unique name for the KDU instance
1011 :param kwargs: Additional parameters (None yet)
1012 :return: If successful, it will return the following vector of arguments:
1013 - K8s `namespace` in the cluster where the KDU lives
1014 - `state` of the KDU instance. It can be:
1015 - UNKNOWN
1016 - DEPLOYED
1017 - DELETED
1018 - SUPERSEDED
1019 - FAILED or
1020 - DELETING
1021 - List of `resources` (objects) that this release consists of, sorted by kind,
1022 and the status of those resources
1023 - Last `deployment_time`.
1024
1025 """
1026 self.log.debug(
1027 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1028 cluster_uuid, kdu_instance
1029 )
1030 )
1031
1032 # sync local dir
1033 self.fs.sync(from_path=cluster_uuid)
1034
1035 # get instance: needed to obtain namespace
1036 instances = await self._instances_list(cluster_id=cluster_uuid)
1037 for instance in instances:
1038 if instance.get("name") == kdu_instance:
1039 break
1040 else:
1041 # instance does not exist
1042 raise K8sException(
1043 "Instance name: {} not found in cluster: {}".format(
1044 kdu_instance, cluster_uuid
1045 )
1046 )
1047
1048 status = await self._status_kdu(
1049 cluster_id=cluster_uuid,
1050 kdu_instance=kdu_instance,
1051 namespace=instance["namespace"],
1052 show_error_log=True,
1053 return_text=True,
1054 )
1055
1056 # sync fs
1057 self.fs.reverse_sync(from_path=cluster_uuid)
1058
1059 return status
1060
1061 async def get_values_kdu(
1062 self, kdu_instance: str, namespace: str, kubeconfig: str
1063 ) -> str:
1064
1065 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1066
1067 return await self._exec_get_command(
1068 get_command="values",
1069 kdu_instance=kdu_instance,
1070 namespace=namespace,
1071 kubeconfig=kubeconfig,
1072 )
1073
1074 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1075
1076 self.log.debug(
1077 "inspect kdu_model values {} from (optional) repo: {}".format(
1078 kdu_model, repo_url
1079 )
1080 )
1081
1082 return await self._exec_inspect_command(
1083 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1084 )
1085
1086 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1087
1088 self.log.debug(
1089 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1090 )
1091
1092 return await self._exec_inspect_command(
1093 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1094 )
1095
1096 async def synchronize_repos(self, cluster_uuid: str):
1097
1098 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1099 try:
1100 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1101 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1102
1103 local_repo_list = await self.repo_list(cluster_uuid)
1104 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1105
1106 deleted_repo_list = []
1107 added_repo_dict = {}
1108
1109 # iterate over the list of repos in the database that should be
1110 # added if not present
1111 for repo_name, db_repo in db_repo_dict.items():
1112 try:
1113 # check if it is already present
1114 curr_repo_url = local_repo_dict.get(db_repo["name"])
1115 repo_id = db_repo.get("_id")
1116 if curr_repo_url != db_repo["url"]:
1117 if curr_repo_url:
1118 self.log.debug(
1119 "repo {} url changed, delete and and again".format(
1120 db_repo["url"]
1121 )
1122 )
1123 await self.repo_remove(cluster_uuid, db_repo["name"])
1124 deleted_repo_list.append(repo_id)
1125
1126 # add repo
1127 self.log.debug("add repo {}".format(db_repo["name"]))
1128 if "ca_cert" in db_repo:
1129 await self.repo_add(
1130 cluster_uuid,
1131 db_repo["name"],
1132 db_repo["url"],
1133 cert=db_repo["ca_cert"],
1134 )
1135 else:
1136 await self.repo_add(
1137 cluster_uuid,
1138 db_repo["name"],
1139 db_repo["url"],
1140 )
1141 added_repo_dict[repo_id] = db_repo["name"]
1142 except Exception as e:
1143 raise K8sException(
1144 "Error adding repo id: {}, err_msg: {} ".format(
1145 repo_id, repr(e)
1146 )
1147 )
1148
1149 # Delete repos that are present but not in nbi_list
1150 for repo_name in local_repo_dict:
1151 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1152 self.log.debug("delete repo {}".format(repo_name))
1153 try:
1154 await self.repo_remove(cluster_uuid, repo_name)
1155 deleted_repo_list.append(repo_name)
1156 except Exception as e:
1157 self.warning(
1158 "Error deleting repo, name: {}, err_msg: {}".format(
1159 repo_name, str(e)
1160 )
1161 )
1162
1163 return deleted_repo_list, added_repo_dict
1164
1165 except K8sException:
1166 raise
1167 except Exception as e:
1168 # Do not raise errors synchronizing repos
1169 self.log.error("Error synchronizing repos: {}".format(e))
1170 raise Exception("Error synchronizing repos: {}".format(e))
1171
1172 def _get_db_repos_dict(self, repo_ids: list):
1173 db_repos_dict = {}
1174 for repo_id in repo_ids:
1175 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1176 db_repos_dict[db_repo["name"]] = db_repo
1177 return db_repos_dict
1178
1179 """
1180 ####################################################################################
1181 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1182 ####################################################################################
1183 """
1184
1185 @abc.abstractmethod
1186 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1187 """
1188 Creates and returns base cluster and kube dirs and returns them.
1189 Also created helm3 dirs according to new directory specification, paths are
1190 not returned but assigned to helm environment variables
1191
1192 :param cluster_name: cluster_name
1193 :return: Dictionary with config_paths and dictionary with helm environment variables
1194 """
1195
1196 @abc.abstractmethod
1197 async def _cluster_init(self, cluster_id, namespace, paths, env):
1198 """
1199 Implements the helm version dependent cluster initialization
1200 """
1201
1202 @abc.abstractmethod
1203 async def _instances_list(self, cluster_id):
1204 """
1205 Implements the helm version dependent helm instances list
1206 """
1207
1208 @abc.abstractmethod
1209 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1210 """
1211 Implements the helm version dependent method to obtain services from a helm instance
1212 """
1213
1214 @abc.abstractmethod
1215 async def _status_kdu(
1216 self,
1217 cluster_id: str,
1218 kdu_instance: str,
1219 namespace: str = None,
1220 show_error_log: bool = False,
1221 return_text: bool = False,
1222 ):
1223 """
1224 Implements the helm version dependent method to obtain status of a helm instance
1225 """
1226
1227 @abc.abstractmethod
1228 def _get_install_command(
1229 self,
1230 kdu_model,
1231 kdu_instance,
1232 namespace,
1233 params_str,
1234 version,
1235 atomic,
1236 timeout,
1237 kubeconfig,
1238 ) -> str:
1239 """
1240 Obtain command to be executed to delete the indicated instance
1241 """
1242
1243 @abc.abstractmethod
1244 def _get_upgrade_scale_command(
1245 self,
1246 kdu_model,
1247 kdu_instance,
1248 namespace,
1249 count,
1250 version,
1251 atomic,
1252 replicas,
1253 timeout,
1254 resource_name,
1255 kubeconfig,
1256 ) -> str:
1257 """Obtain command to be executed to upgrade the indicated instance."""
1258
1259 @abc.abstractmethod
1260 def _get_upgrade_command(
1261 self,
1262 kdu_model,
1263 kdu_instance,
1264 namespace,
1265 params_str,
1266 version,
1267 atomic,
1268 timeout,
1269 kubeconfig,
1270 ) -> str:
1271 """
1272 Obtain command to be executed to upgrade the indicated instance
1273 """
1274
1275 @abc.abstractmethod
1276 def _get_rollback_command(
1277 self, kdu_instance, namespace, revision, kubeconfig
1278 ) -> str:
1279 """
1280 Obtain command to be executed to rollback the indicated instance
1281 """
1282
1283 @abc.abstractmethod
1284 def _get_uninstall_command(
1285 self, kdu_instance: str, namespace: str, kubeconfig: str
1286 ) -> str:
1287 """
1288 Obtain command to be executed to delete the indicated instance
1289 """
1290
1291 @abc.abstractmethod
1292 def _get_inspect_command(
1293 self, show_command: str, kdu_model: str, repo_str: str, version: str
1294 ):
1295 """
1296 Obtain command to be executed to obtain information about the kdu
1297 """
1298
1299 @abc.abstractmethod
1300 def _get_get_command(
1301 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1302 ):
1303 """Obtain command to be executed to get information about the kdu instance."""
1304
1305 @abc.abstractmethod
1306 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1307 """
1308 Method call to uninstall cluster software for helm. This method is dependent
1309 of helm version
1310 For Helm v2 it will be called when Tiller must be uninstalled
1311 For Helm v3 it does nothing and does not need to be callled
1312 """
1313
1314 @abc.abstractmethod
1315 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1316 """
1317 Obtains the cluster repos identifiers
1318 """
1319
1320 """
1321 ####################################################################################
1322 ################################### P R I V A T E ##################################
1323 ####################################################################################
1324 """
1325
1326 @staticmethod
1327 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1328 if os.path.exists(filename):
1329 return True
1330 else:
1331 msg = "File {} does not exist".format(filename)
1332 if exception_if_not_exists:
1333 raise K8sException(msg)
1334
1335 @staticmethod
1336 def _remove_multiple_spaces(strobj):
1337 strobj = strobj.strip()
1338 while " " in strobj:
1339 strobj = strobj.replace(" ", " ")
1340 return strobj
1341
1342 @staticmethod
1343 def _output_to_lines(output: str) -> list:
1344 output_lines = list()
1345 lines = output.splitlines(keepends=False)
1346 for line in lines:
1347 line = line.strip()
1348 if len(line) > 0:
1349 output_lines.append(line)
1350 return output_lines
1351
1352 @staticmethod
1353 def _output_to_table(output: str) -> list:
1354 output_table = list()
1355 lines = output.splitlines(keepends=False)
1356 for line in lines:
1357 line = line.replace("\t", " ")
1358 line_list = list()
1359 output_table.append(line_list)
1360 cells = line.split(sep=" ")
1361 for cell in cells:
1362 cell = cell.strip()
1363 if len(cell) > 0:
1364 line_list.append(cell)
1365 return output_table
1366
1367 @staticmethod
1368 def _parse_services(output: str) -> list:
1369 lines = output.splitlines(keepends=False)
1370 services = []
1371 for line in lines:
1372 line = line.replace("\t", " ")
1373 cells = line.split(sep=" ")
1374 if len(cells) > 0 and cells[0].startswith("service/"):
1375 elems = cells[0].split(sep="/")
1376 if len(elems) > 1:
1377 services.append(elems[1])
1378 return services
1379
1380 @staticmethod
1381 def _get_deep(dictionary: dict, members: tuple):
1382 target = dictionary
1383 value = None
1384 try:
1385 for m in members:
1386 value = target.get(m)
1387 if not value:
1388 return None
1389 else:
1390 target = value
1391 except Exception:
1392 pass
1393 return value
1394
1395 # find key:value in several lines
1396 @staticmethod
1397 def _find_in_lines(p_lines: list, p_key: str) -> str:
1398 for line in p_lines:
1399 try:
1400 if line.startswith(p_key + ":"):
1401 parts = line.split(":")
1402 the_value = parts[1].strip()
1403 return the_value
1404 except Exception:
1405 # ignore it
1406 pass
1407 return None
1408
1409 @staticmethod
1410 def _lower_keys_list(input_list: list):
1411 """
1412 Transform the keys in a list of dictionaries to lower case and returns a new list
1413 of dictionaries
1414 """
1415 new_list = []
1416 if input_list:
1417 for dictionary in input_list:
1418 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1419 new_list.append(new_dict)
1420 return new_list
1421
1422 async def _local_async_exec(
1423 self,
1424 command: str,
1425 raise_exception_on_error: bool = False,
1426 show_error_log: bool = True,
1427 encode_utf8: bool = False,
1428 env: dict = None,
1429 ) -> (str, int):
1430
1431 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1432 self.log.debug(
1433 "Executing async local command: {}, env: {}".format(command, env)
1434 )
1435
1436 # split command
1437 command = shlex.split(command)
1438
1439 environ = os.environ.copy()
1440 if env:
1441 environ.update(env)
1442
1443 try:
1444 process = await asyncio.create_subprocess_exec(
1445 *command,
1446 stdout=asyncio.subprocess.PIPE,
1447 stderr=asyncio.subprocess.PIPE,
1448 env=environ,
1449 )
1450
1451 # wait for command terminate
1452 stdout, stderr = await process.communicate()
1453
1454 return_code = process.returncode
1455
1456 output = ""
1457 if stdout:
1458 output = stdout.decode("utf-8").strip()
1459 # output = stdout.decode()
1460 if stderr:
1461 output = stderr.decode("utf-8").strip()
1462 # output = stderr.decode()
1463
1464 if return_code != 0 and show_error_log:
1465 self.log.debug(
1466 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1467 )
1468 else:
1469 self.log.debug("Return code: {}".format(return_code))
1470
1471 if raise_exception_on_error and return_code != 0:
1472 raise K8sException(output)
1473
1474 if encode_utf8:
1475 output = output.encode("utf-8").strip()
1476 output = str(output).replace("\\n", "\n")
1477
1478 return output, return_code
1479
1480 except asyncio.CancelledError:
1481 raise
1482 except K8sException:
1483 raise
1484 except Exception as e:
1485 msg = "Exception executing command: {} -> {}".format(command, e)
1486 self.log.error(msg)
1487 if raise_exception_on_error:
1488 raise K8sException(e) from e
1489 else:
1490 return "", -1
1491
1492 async def _local_async_exec_pipe(
1493 self,
1494 command1: str,
1495 command2: str,
1496 raise_exception_on_error: bool = True,
1497 show_error_log: bool = True,
1498 encode_utf8: bool = False,
1499 env: dict = None,
1500 ):
1501
1502 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1503 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1504 command = "{} | {}".format(command1, command2)
1505 self.log.debug(
1506 "Executing async local command: {}, env: {}".format(command, env)
1507 )
1508
1509 # split command
1510 command1 = shlex.split(command1)
1511 command2 = shlex.split(command2)
1512
1513 environ = os.environ.copy()
1514 if env:
1515 environ.update(env)
1516
1517 try:
1518 read, write = os.pipe()
1519 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1520 os.close(write)
1521 process_2 = await asyncio.create_subprocess_exec(
1522 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1523 )
1524 os.close(read)
1525 stdout, stderr = await process_2.communicate()
1526
1527 return_code = process_2.returncode
1528
1529 output = ""
1530 if stdout:
1531 output = stdout.decode("utf-8").strip()
1532 # output = stdout.decode()
1533 if stderr:
1534 output = stderr.decode("utf-8").strip()
1535 # output = stderr.decode()
1536
1537 if return_code != 0 and show_error_log:
1538 self.log.debug(
1539 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1540 )
1541 else:
1542 self.log.debug("Return code: {}".format(return_code))
1543
1544 if raise_exception_on_error and return_code != 0:
1545 raise K8sException(output)
1546
1547 if encode_utf8:
1548 output = output.encode("utf-8").strip()
1549 output = str(output).replace("\\n", "\n")
1550
1551 return output, return_code
1552 except asyncio.CancelledError:
1553 raise
1554 except K8sException:
1555 raise
1556 except Exception as e:
1557 msg = "Exception executing command: {} -> {}".format(command, e)
1558 self.log.error(msg)
1559 if raise_exception_on_error:
1560 raise K8sException(e) from e
1561 else:
1562 return "", -1
1563
1564 async def _get_service(self, cluster_id, service_name, namespace):
1565 """
1566 Obtains the data of the specified service in the k8cluster.
1567
1568 :param cluster_id: id of a K8s cluster known by OSM
1569 :param service_name: name of the K8s service in the specified namespace
1570 :param namespace: K8s namespace used by the KDU instance
1571 :return: If successful, it will return a service with the following data:
1572 - `name` of the service
1573 - `type` type of service in the k8 cluster
1574 - `ports` List of ports offered by the service, for each port includes at least
1575 name, port, protocol
1576 - `cluster_ip` Internal ip to be used inside k8s cluster
1577 - `external_ip` List of external ips (in case they are available)
1578 """
1579
1580 # init config, env
1581 paths, env = self._init_paths_env(
1582 cluster_name=cluster_id, create_if_not_exist=True
1583 )
1584
1585 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1586 self.kubectl_command, paths["kube_config"], namespace, service_name
1587 )
1588
1589 output, _rc = await self._local_async_exec(
1590 command=command, raise_exception_on_error=True, env=env
1591 )
1592
1593 data = yaml.load(output, Loader=yaml.SafeLoader)
1594
1595 service = {
1596 "name": service_name,
1597 "type": self._get_deep(data, ("spec", "type")),
1598 "ports": self._get_deep(data, ("spec", "ports")),
1599 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1600 }
1601 if service["type"] == "LoadBalancer":
1602 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1603 ip_list = [elem["ip"] for elem in ip_map_list]
1604 service["external_ip"] = ip_list
1605
1606 return service
1607
1608 async def _exec_get_command(
1609 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1610 ):
1611 """Obtains information about the kdu instance."""
1612
1613 full_command = self._get_get_command(
1614 get_command, kdu_instance, namespace, kubeconfig
1615 )
1616
1617 output, _rc = await self._local_async_exec(command=full_command)
1618
1619 return output
1620
1621 async def _exec_inspect_command(
1622 self, inspect_command: str, kdu_model: str, repo_url: str = None
1623 ):
1624 """Obtains information about a kdu, no cluster (no env)."""
1625
1626 repo_str = ""
1627 if repo_url:
1628 repo_str = " --repo {}".format(repo_url)
1629
1630 idx = kdu_model.find("/")
1631 if idx >= 0:
1632 idx += 1
1633 kdu_model = kdu_model[idx:]
1634
1635 kdu_model, version = self._split_version(kdu_model)
1636 if version:
1637 version_str = "--version {}".format(version)
1638 else:
1639 version_str = ""
1640
1641 full_command = self._get_inspect_command(
1642 inspect_command, kdu_model, repo_str, version_str
1643 )
1644
1645 output, _rc = await self._local_async_exec(command=full_command)
1646
1647 return output
1648
1649 async def _get_replica_count_url(
1650 self,
1651 kdu_model: str,
1652 repo_url: str,
1653 resource_name: str = None,
1654 ):
1655 """Get the replica count value in the Helm Chart Values.
1656
1657 Args:
1658 kdu_model: The name or path of a bundle
1659 repo_url: Helm Chart repository url
1660 resource_name: Resource name
1661
1662 Returns:
1663 True if replicas, False replicaCount
1664 """
1665
1666 kdu_values = yaml.load(
1667 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1668 )
1669
1670 if not kdu_values:
1671 raise K8sException(
1672 "kdu_values not found for kdu_model {}".format(kdu_model)
1673 )
1674
1675 if resource_name:
1676 kdu_values = kdu_values.get(resource_name, None)
1677
1678 if not kdu_values:
1679 msg = "resource {} not found in the values in model {}".format(
1680 resource_name, kdu_model
1681 )
1682 self.log.error(msg)
1683 raise K8sException(msg)
1684
1685 duplicate_check = False
1686
1687 replica_str = ""
1688 replicas = None
1689
1690 if kdu_values.get("replicaCount", None):
1691 replicas = kdu_values["replicaCount"]
1692 replica_str = "replicaCount"
1693 elif kdu_values.get("replicas", None):
1694 duplicate_check = True
1695 replicas = kdu_values["replicas"]
1696 replica_str = "replicas"
1697 else:
1698 if resource_name:
1699 msg = (
1700 "replicaCount or replicas not found in the resource"
1701 "{} values in model {}. Cannot be scaled".format(
1702 resource_name, kdu_model
1703 )
1704 )
1705 else:
1706 msg = (
1707 "replicaCount or replicas not found in the values"
1708 "in model {}. Cannot be scaled".format(kdu_model)
1709 )
1710 self.log.error(msg)
1711 raise K8sException(msg)
1712
1713 # Control if replicas and replicaCount exists at the same time
1714 msg = "replicaCount and replicas are exists at the same time"
1715 if duplicate_check:
1716 if "replicaCount" in kdu_values:
1717 self.log.error(msg)
1718 raise K8sException(msg)
1719 else:
1720 if "replicas" in kdu_values:
1721 self.log.error(msg)
1722 raise K8sException(msg)
1723
1724 return replicas, replica_str
1725
1726 async def _get_replica_count_instance(
1727 self,
1728 kdu_instance: str,
1729 namespace: str,
1730 kubeconfig: str,
1731 resource_name: str = None,
1732 ):
1733 """Get the replica count value in the instance.
1734
1735 Args:
1736 kdu_instance: The name of the KDU instance
1737 namespace: KDU instance namespace
1738 kubeconfig:
1739 resource_name: Resource name
1740
1741 Returns:
1742 True if replicas, False replicaCount
1743 """
1744
1745 kdu_values = yaml.load(
1746 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1747 Loader=yaml.SafeLoader,
1748 )
1749
1750 replicas = None
1751
1752 if kdu_values:
1753 resource_values = (
1754 kdu_values.get(resource_name, None) if resource_name else None
1755 )
1756 replicas = (
1757 (
1758 resource_values.get("replicaCount", None)
1759 or resource_values.get("replicas", None)
1760 )
1761 if resource_values
1762 else (
1763 kdu_values.get("replicaCount", None)
1764 or kdu_values.get("replicas", None)
1765 )
1766 )
1767
1768 return replicas
1769
1770 async def _store_status(
1771 self,
1772 cluster_id: str,
1773 operation: str,
1774 kdu_instance: str,
1775 namespace: str = None,
1776 check_every: float = 10,
1777 db_dict: dict = None,
1778 run_once: bool = False,
1779 ):
1780 while True:
1781 try:
1782 await asyncio.sleep(check_every)
1783 detailed_status = await self._status_kdu(
1784 cluster_id=cluster_id,
1785 kdu_instance=kdu_instance,
1786 namespace=namespace,
1787 return_text=False,
1788 )
1789 status = detailed_status.get("info").get("description")
1790 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1791 # write status to db
1792 result = await self.write_app_status_to_db(
1793 db_dict=db_dict,
1794 status=str(status),
1795 detailed_status=str(detailed_status),
1796 operation=operation,
1797 )
1798 if not result:
1799 self.log.info("Error writing in database. Task exiting...")
1800 return
1801 except asyncio.CancelledError:
1802 self.log.debug("Task cancelled")
1803 return
1804 except Exception as e:
1805 self.log.debug(
1806 "_store_status exception: {}".format(str(e)), exc_info=True
1807 )
1808 pass
1809 finally:
1810 if run_once:
1811 return
1812
1813 # params for use in -f file
1814 # returns values file option and filename (in order to delete it at the end)
1815 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1816
1817 if params and len(params) > 0:
1818 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1819
1820 def get_random_number():
1821 r = random.randrange(start=1, stop=99999999)
1822 s = str(r)
1823 while len(s) < 10:
1824 s = "0" + s
1825 return s
1826
1827 params2 = dict()
1828 for key in params:
1829 value = params.get(key)
1830 if "!!yaml" in str(value):
1831 value = yaml.load(value[7:])
1832 params2[key] = value
1833
1834 values_file = get_random_number() + ".yaml"
1835 with open(values_file, "w") as stream:
1836 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1837
1838 return "-f {}".format(values_file), values_file
1839
1840 return "", None
1841
1842 # params for use in --set option
1843 @staticmethod
1844 def _params_to_set_option(params: dict) -> str:
1845 params_str = ""
1846 if params and len(params) > 0:
1847 start = True
1848 for key in params:
1849 value = params.get(key, None)
1850 if value is not None:
1851 if start:
1852 params_str += "--set "
1853 start = False
1854 else:
1855 params_str += ","
1856 params_str += "{}={}".format(key, value)
1857 return params_str
1858
1859 @staticmethod
1860 def generate_kdu_instance_name(**kwargs):
1861 chart_name = kwargs["kdu_model"]
1862 # check embeded chart (file or dir)
1863 if chart_name.startswith("/"):
1864 # extract file or directory name
1865 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1866 # check URL
1867 elif "://" in chart_name:
1868 # extract last portion of URL
1869 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1870
1871 name = ""
1872 for c in chart_name:
1873 if c.isalpha() or c.isnumeric():
1874 name += c
1875 else:
1876 name += "-"
1877 if len(name) > 35:
1878 name = name[0:35]
1879
1880 # if does not start with alpha character, prefix 'a'
1881 if not name[0].isalpha():
1882 name = "a" + name
1883
1884 name += "-"
1885
1886 def get_random_number():
1887 r = random.randrange(start=1, stop=99999999)
1888 s = str(r)
1889 s = s.rjust(10, "0")
1890 return s
1891
1892 name = name + get_random_number()
1893 return name.lower()
1894
1895 def _split_version(self, kdu_model: str) -> (str, str):
1896 version = None
1897 if ":" in kdu_model:
1898 parts = kdu_model.split(sep=":")
1899 if len(parts) == 2:
1900 version = str(parts[1])
1901 kdu_model = parts[0]
1902 return kdu_model, version
1903
1904 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1905 repo_url = None
1906 idx = kdu_model.find("/")
1907 if idx >= 0:
1908 repo_name = kdu_model[:idx]
1909 # Find repository link
1910 local_repo_list = await self.repo_list(cluster_uuid)
1911 for repo in local_repo_list:
1912 repo_url = repo["url"] if repo["name"] == repo_name else None
1913 return repo_url