Skip exception if model doesn't exist after delete failure
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
163 ):
164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid, repo_type, name, url
167 )
168 )
169
170 # init_env
171 paths, env = self._init_paths_env(
172 cluster_name=cluster_uuid, create_if_not_exist=True
173 )
174
175 # sync local dir
176 self.fs.sync(from_path=cluster_uuid)
177
178 # helm repo update
179 command = "env KUBECONFIG={} {} repo update".format(
180 paths["kube_config"], self._helm_command
181 )
182 self.log.debug("updating repo: {}".format(command))
183 await self._local_async_exec(
184 command=command, raise_exception_on_error=False, env=env
185 )
186
187 # helm repo add name url
188 command = ("env KUBECONFIG={} {} repo add {} {}").format(
189 paths["kube_config"], self._helm_command, name, url
190 )
191
192 if cert:
193 temp_cert_file = os.path.join(
194 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
195 )
196 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
197 with open(temp_cert_file, "w") as the_cert:
198 the_cert.write(cert)
199 command += " --ca-file {}".format(temp_cert_file)
200
201 if user:
202 command += " --username={}".format(user)
203
204 if password:
205 command += " --password={}".format(password)
206
207 self.log.debug("adding repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=True, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_uuid)
214
215 async def repo_list(self, cluster_uuid: str) -> list:
216 """
217 Get the list of registered repositories
218
219 :return: list of registered repositories: [ (name, url) .... ]
220 """
221
222 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
223
224 # config filename
225 paths, env = self._init_paths_env(
226 cluster_name=cluster_uuid, create_if_not_exist=True
227 )
228
229 # sync local dir
230 self.fs.sync(from_path=cluster_uuid)
231
232 command = "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths["kube_config"], self._helm_command
234 )
235
236 # Set exception to false because if there are no repos just want an empty list
237 output, _rc = await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_uuid)
243
244 if _rc == 0:
245 if output and len(output) > 0:
246 repos = yaml.load(output, Loader=yaml.SafeLoader)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self._lower_keys_list(repos)
249 else:
250 return []
251 else:
252 return []
253
254 async def repo_remove(self, cluster_uuid: str, name: str):
255 self.log.debug(
256 "remove {} repositories for cluster {}".format(name, cluster_uuid)
257 )
258
259 # init env, paths
260 paths, env = self._init_paths_env(
261 cluster_name=cluster_uuid, create_if_not_exist=True
262 )
263
264 # sync local dir
265 self.fs.sync(from_path=cluster_uuid)
266
267 command = "env KUBECONFIG={} {} repo remove {}".format(
268 paths["kube_config"], self._helm_command, name
269 )
270 await self._local_async_exec(
271 command=command, raise_exception_on_error=True, env=env
272 )
273
274 # sync fs
275 self.fs.reverse_sync(from_path=cluster_uuid)
276
277 async def reset(
278 self,
279 cluster_uuid: str,
280 force: bool = False,
281 uninstall_sw: bool = False,
282 **kwargs,
283 ) -> bool:
284 """Reset a cluster
285
286 Resets the Kubernetes cluster by removing the helm deployment that represents it.
287
288 :param cluster_uuid: The UUID of the cluster to reset
289 :param force: Boolean to force the reset
290 :param uninstall_sw: Boolean to force the reset
291 :param kwargs: Additional parameters (None yet)
292 :return: Returns True if successful or raises an exception.
293 """
294 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
295 self.log.debug(
296 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
297 cluster_uuid, uninstall_sw
298 )
299 )
300
301 # sync local dir
302 self.fs.sync(from_path=cluster_uuid)
303
304 # uninstall releases if needed.
305 if uninstall_sw:
306 releases = await self.instances_list(cluster_uuid=cluster_uuid)
307 if len(releases) > 0:
308 if force:
309 for r in releases:
310 try:
311 kdu_instance = r.get("name")
312 chart = r.get("chart")
313 self.log.debug(
314 "Uninstalling {} -> {}".format(chart, kdu_instance)
315 )
316 await self.uninstall(
317 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
318 )
319 except Exception as e:
320 # will not raise exception as it was found
321 # that in some cases of previously installed helm releases it
322 # raised an error
323 self.log.warn(
324 "Error uninstalling release {}: {}".format(
325 kdu_instance, e
326 )
327 )
328 else:
329 msg = (
330 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
331 ).format(cluster_uuid)
332 self.log.warn(msg)
333 uninstall_sw = (
334 False # Allow to remove k8s cluster without removing Tiller
335 )
336
337 if uninstall_sw:
338 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
339
340 # delete cluster directory
341 self.log.debug("Removing directory {}".format(cluster_uuid))
342 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
343 # Remove also local directorio if still exist
344 direct = self.fs.path + "/" + cluster_uuid
345 shutil.rmtree(direct, ignore_errors=True)
346
347 return True
348
349 async def _install_impl(
350 self,
351 cluster_id: str,
352 kdu_model: str,
353 paths: dict,
354 env: dict,
355 kdu_instance: str,
356 atomic: bool = True,
357 timeout: float = 300,
358 params: dict = None,
359 db_dict: dict = None,
360 kdu_name: str = None,
361 namespace: str = None,
362 ):
363 # init env, paths
364 paths, env = self._init_paths_env(
365 cluster_name=cluster_id, create_if_not_exist=True
366 )
367
368 # params to str
369 params_str, file_to_delete = self._params_to_file_option(
370 cluster_id=cluster_id, params=params
371 )
372
373 # version
374 kdu_model, version = self._split_version(kdu_model)
375
376 command = self._get_install_command(
377 kdu_model,
378 kdu_instance,
379 namespace,
380 params_str,
381 version,
382 atomic,
383 timeout,
384 paths["kube_config"],
385 )
386
387 self.log.debug("installing: {}".format(command))
388
389 if atomic:
390 # exec helm in a task
391 exec_task = asyncio.ensure_future(
392 coro_or_future=self._local_async_exec(
393 command=command, raise_exception_on_error=False, env=env
394 )
395 )
396
397 # write status in another task
398 status_task = asyncio.ensure_future(
399 coro_or_future=self._store_status(
400 cluster_id=cluster_id,
401 kdu_instance=kdu_instance,
402 namespace=namespace,
403 db_dict=db_dict,
404 operation="install",
405 run_once=False,
406 )
407 )
408
409 # wait for execution task
410 await asyncio.wait([exec_task])
411
412 # cancel status task
413 status_task.cancel()
414
415 output, rc = exec_task.result()
416
417 else:
418
419 output, rc = await self._local_async_exec(
420 command=command, raise_exception_on_error=False, env=env
421 )
422
423 # remove temporal values yaml file
424 if file_to_delete:
425 os.remove(file_to_delete)
426
427 # write final status
428 await self._store_status(
429 cluster_id=cluster_id,
430 kdu_instance=kdu_instance,
431 namespace=namespace,
432 db_dict=db_dict,
433 operation="install",
434 run_once=True,
435 check_every=0,
436 )
437
438 if rc != 0:
439 msg = "Error executing command: {}\nOutput: {}".format(command, output)
440 self.log.error(msg)
441 raise K8sException(msg)
442
443 async def upgrade(
444 self,
445 cluster_uuid: str,
446 kdu_instance: str,
447 kdu_model: str = None,
448 atomic: bool = True,
449 timeout: float = 300,
450 params: dict = None,
451 db_dict: dict = None,
452 ):
453 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
454
455 # sync local dir
456 self.fs.sync(from_path=cluster_uuid)
457
458 # look for instance to obtain namespace
459 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
460 if not instance_info:
461 raise K8sException("kdu_instance {} not found".format(kdu_instance))
462
463 # init env, paths
464 paths, env = self._init_paths_env(
465 cluster_name=cluster_uuid, create_if_not_exist=True
466 )
467
468 # sync local dir
469 self.fs.sync(from_path=cluster_uuid)
470
471 # params to str
472 params_str, file_to_delete = self._params_to_file_option(
473 cluster_id=cluster_uuid, params=params
474 )
475
476 # version
477 kdu_model, version = self._split_version(kdu_model)
478
479 command = self._get_upgrade_command(
480 kdu_model,
481 kdu_instance,
482 instance_info["namespace"],
483 params_str,
484 version,
485 atomic,
486 timeout,
487 paths["kube_config"],
488 )
489
490 self.log.debug("upgrading: {}".format(command))
491
492 if atomic:
493
494 # exec helm in a task
495 exec_task = asyncio.ensure_future(
496 coro_or_future=self._local_async_exec(
497 command=command, raise_exception_on_error=False, env=env
498 )
499 )
500 # write status in another task
501 status_task = asyncio.ensure_future(
502 coro_or_future=self._store_status(
503 cluster_id=cluster_uuid,
504 kdu_instance=kdu_instance,
505 namespace=instance_info["namespace"],
506 db_dict=db_dict,
507 operation="upgrade",
508 run_once=False,
509 )
510 )
511
512 # wait for execution task
513 await asyncio.wait([exec_task])
514
515 # cancel status task
516 status_task.cancel()
517 output, rc = exec_task.result()
518
519 else:
520
521 output, rc = await self._local_async_exec(
522 command=command, raise_exception_on_error=False, env=env
523 )
524
525 # remove temporal values yaml file
526 if file_to_delete:
527 os.remove(file_to_delete)
528
529 # write final status
530 await self._store_status(
531 cluster_id=cluster_uuid,
532 kdu_instance=kdu_instance,
533 namespace=instance_info["namespace"],
534 db_dict=db_dict,
535 operation="upgrade",
536 run_once=True,
537 check_every=0,
538 )
539
540 if rc != 0:
541 msg = "Error executing command: {}\nOutput: {}".format(command, output)
542 self.log.error(msg)
543 raise K8sException(msg)
544
545 # sync fs
546 self.fs.reverse_sync(from_path=cluster_uuid)
547
548 # return new revision number
549 instance = await self.get_instance_info(
550 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
551 )
552 if instance:
553 revision = int(instance.get("revision"))
554 self.log.debug("New revision: {}".format(revision))
555 return revision
556 else:
557 return 0
558
559 async def scale(
560 self,
561 kdu_instance: str,
562 scale: int,
563 resource_name: str,
564 total_timeout: float = 1800,
565 cluster_uuid: str = None,
566 kdu_model: str = None,
567 atomic: bool = True,
568 db_dict: dict = None,
569 **kwargs,
570 ):
571 """Scale a resource in a Helm Chart.
572
573 Args:
574 kdu_instance: KDU instance name
575 scale: Scale to which to set the resource
576 resource_name: Resource name
577 total_timeout: The time, in seconds, to wait
578 cluster_uuid: The UUID of the cluster
579 kdu_model: The chart reference
580 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
581 The --wait flag will be set automatically if --atomic is used
582 db_dict: Dictionary for any additional data
583 kwargs: Additional parameters
584
585 Returns:
586 True if successful, False otherwise
587 """
588
589 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
590 if resource_name:
591 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
592 resource_name, kdu_model, cluster_uuid
593 )
594
595 self.log.debug(debug_mgs)
596
597 # look for instance to obtain namespace
598 # get_instance_info function calls the sync command
599 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
600 if not instance_info:
601 raise K8sException("kdu_instance {} not found".format(kdu_instance))
602
603 # init env, paths
604 paths, env = self._init_paths_env(
605 cluster_name=cluster_uuid, create_if_not_exist=True
606 )
607
608 # version
609 kdu_model, version = self._split_version(kdu_model)
610
611 repo_url = await self._find_repo(kdu_model, cluster_uuid)
612 if not repo_url:
613 raise K8sException(
614 "Repository not found for kdu_model {}".format(kdu_model)
615 )
616
617 _, replica_str = await self._get_replica_count_url(
618 kdu_model, repo_url, resource_name
619 )
620
621 command = self._get_upgrade_scale_command(
622 kdu_model,
623 kdu_instance,
624 instance_info["namespace"],
625 scale,
626 version,
627 atomic,
628 replica_str,
629 total_timeout,
630 resource_name,
631 paths["kube_config"],
632 )
633
634 self.log.debug("scaling: {}".format(command))
635
636 if atomic:
637 # exec helm in a task
638 exec_task = asyncio.ensure_future(
639 coro_or_future=self._local_async_exec(
640 command=command, raise_exception_on_error=False, env=env
641 )
642 )
643 # write status in another task
644 status_task = asyncio.ensure_future(
645 coro_or_future=self._store_status(
646 cluster_id=cluster_uuid,
647 kdu_instance=kdu_instance,
648 namespace=instance_info["namespace"],
649 db_dict=db_dict,
650 operation="scale",
651 run_once=False,
652 )
653 )
654
655 # wait for execution task
656 await asyncio.wait([exec_task])
657
658 # cancel status task
659 status_task.cancel()
660 output, rc = exec_task.result()
661
662 else:
663 output, rc = await self._local_async_exec(
664 command=command, raise_exception_on_error=False, env=env
665 )
666
667 # write final status
668 await self._store_status(
669 cluster_id=cluster_uuid,
670 kdu_instance=kdu_instance,
671 namespace=instance_info["namespace"],
672 db_dict=db_dict,
673 operation="scale",
674 run_once=True,
675 check_every=0,
676 )
677
678 if rc != 0:
679 msg = "Error executing command: {}\nOutput: {}".format(command, output)
680 self.log.error(msg)
681 raise K8sException(msg)
682
683 # sync fs
684 self.fs.reverse_sync(from_path=cluster_uuid)
685
686 return True
687
688 async def get_scale_count(
689 self,
690 resource_name: str,
691 kdu_instance: str,
692 cluster_uuid: str,
693 kdu_model: str,
694 **kwargs,
695 ) -> int:
696 """Get a resource scale count.
697
698 Args:
699 cluster_uuid: The UUID of the cluster
700 resource_name: Resource name
701 kdu_instance: KDU instance name
702 kdu_model: The name or path of a bundle
703 kwargs: Additional parameters
704
705 Returns:
706 Resource instance count
707 """
708
709 self.log.debug(
710 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
711 )
712
713 # look for instance to obtain namespace
714 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
715 if not instance_info:
716 raise K8sException("kdu_instance {} not found".format(kdu_instance))
717
718 # init env, paths
719 paths, env = self._init_paths_env(
720 cluster_name=cluster_uuid, create_if_not_exist=True
721 )
722
723 replicas = await self._get_replica_count_instance(
724 kdu_instance, instance_info["namespace"], paths["kube_config"]
725 )
726
727 # Get default value if scale count is not found from provided values
728 if not replicas:
729 repo_url = await self._find_repo(kdu_model, cluster_uuid)
730 if not repo_url:
731 raise K8sException(
732 "Repository not found for kdu_model {}".format(kdu_model)
733 )
734
735 replicas, _ = await self._get_replica_count_url(
736 kdu_model, repo_url, resource_name
737 )
738
739 if not replicas:
740 msg = "Replica count not found. Cannot be scaled"
741 self.log.error(msg)
742 raise K8sException(msg)
743
744 return int(replicas)
745
746 async def rollback(
747 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
748 ):
749 self.log.debug(
750 "rollback kdu_instance {} to revision {} from cluster {}".format(
751 kdu_instance, revision, cluster_uuid
752 )
753 )
754
755 # sync local dir
756 self.fs.sync(from_path=cluster_uuid)
757
758 # look for instance to obtain namespace
759 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
760 if not instance_info:
761 raise K8sException("kdu_instance {} not found".format(kdu_instance))
762
763 # init env, paths
764 paths, env = self._init_paths_env(
765 cluster_name=cluster_uuid, create_if_not_exist=True
766 )
767
768 # sync local dir
769 self.fs.sync(from_path=cluster_uuid)
770
771 command = self._get_rollback_command(
772 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
773 )
774
775 self.log.debug("rolling_back: {}".format(command))
776
777 # exec helm in a task
778 exec_task = asyncio.ensure_future(
779 coro_or_future=self._local_async_exec(
780 command=command, raise_exception_on_error=False, env=env
781 )
782 )
783 # write status in another task
784 status_task = asyncio.ensure_future(
785 coro_or_future=self._store_status(
786 cluster_id=cluster_uuid,
787 kdu_instance=kdu_instance,
788 namespace=instance_info["namespace"],
789 db_dict=db_dict,
790 operation="rollback",
791 run_once=False,
792 )
793 )
794
795 # wait for execution task
796 await asyncio.wait([exec_task])
797
798 # cancel status task
799 status_task.cancel()
800
801 output, rc = exec_task.result()
802
803 # write final status
804 await self._store_status(
805 cluster_id=cluster_uuid,
806 kdu_instance=kdu_instance,
807 namespace=instance_info["namespace"],
808 db_dict=db_dict,
809 operation="rollback",
810 run_once=True,
811 check_every=0,
812 )
813
814 if rc != 0:
815 msg = "Error executing command: {}\nOutput: {}".format(command, output)
816 self.log.error(msg)
817 raise K8sException(msg)
818
819 # sync fs
820 self.fs.reverse_sync(from_path=cluster_uuid)
821
822 # return new revision number
823 instance = await self.get_instance_info(
824 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
825 )
826 if instance:
827 revision = int(instance.get("revision"))
828 self.log.debug("New revision: {}".format(revision))
829 return revision
830 else:
831 return 0
832
833 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
834 """
835 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
836 (this call should happen after all _terminate-config-primitive_ of the VNF
837 are invoked).
838
839 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
840 :param kdu_instance: unique name for the KDU instance to be deleted
841 :param kwargs: Additional parameters (None yet)
842 :return: True if successful
843 """
844
845 self.log.debug(
846 "uninstall kdu_instance {} from cluster {}".format(
847 kdu_instance, cluster_uuid
848 )
849 )
850
851 # sync local dir
852 self.fs.sync(from_path=cluster_uuid)
853
854 # look for instance to obtain namespace
855 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
856 if not instance_info:
857 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
858 return True
859 # init env, paths
860 paths, env = self._init_paths_env(
861 cluster_name=cluster_uuid, create_if_not_exist=True
862 )
863
864 # sync local dir
865 self.fs.sync(from_path=cluster_uuid)
866
867 command = self._get_uninstall_command(
868 kdu_instance, instance_info["namespace"], paths["kube_config"]
869 )
870 output, _rc = await self._local_async_exec(
871 command=command, raise_exception_on_error=True, env=env
872 )
873
874 # sync fs
875 self.fs.reverse_sync(from_path=cluster_uuid)
876
877 return self._output_to_table(output)
878
879 async def instances_list(self, cluster_uuid: str) -> list:
880 """
881 returns a list of deployed releases in a cluster
882
883 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
884 :return:
885 """
886
887 self.log.debug("list releases for cluster {}".format(cluster_uuid))
888
889 # sync local dir
890 self.fs.sync(from_path=cluster_uuid)
891
892 # execute internal command
893 result = await self._instances_list(cluster_uuid)
894
895 # sync fs
896 self.fs.reverse_sync(from_path=cluster_uuid)
897
898 return result
899
900 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
901 instances = await self.instances_list(cluster_uuid=cluster_uuid)
902 for instance in instances:
903 if instance.get("name") == kdu_instance:
904 return instance
905 self.log.debug("Instance {} not found".format(kdu_instance))
906 return None
907
908 async def exec_primitive(
909 self,
910 cluster_uuid: str = None,
911 kdu_instance: str = None,
912 primitive_name: str = None,
913 timeout: float = 300,
914 params: dict = None,
915 db_dict: dict = None,
916 **kwargs,
917 ) -> str:
918 """Exec primitive (Juju action)
919
920 :param cluster_uuid: The UUID of the cluster or namespace:cluster
921 :param kdu_instance: The unique name of the KDU instance
922 :param primitive_name: Name of action that will be executed
923 :param timeout: Timeout for action execution
924 :param params: Dictionary of all the parameters needed for the action
925 :db_dict: Dictionary for any additional data
926 :param kwargs: Additional parameters (None yet)
927
928 :return: Returns the output of the action
929 """
930 raise K8sException(
931 "KDUs deployed with Helm don't support actions "
932 "different from rollback, upgrade and status"
933 )
934
935 async def get_services(
936 self, cluster_uuid: str, kdu_instance: str, namespace: str
937 ) -> list:
938 """
939 Returns a list of services defined for the specified kdu instance.
940
941 :param cluster_uuid: UUID of a K8s cluster known by OSM
942 :param kdu_instance: unique name for the KDU instance
943 :param namespace: K8s namespace used by the KDU instance
944 :return: If successful, it will return a list of services, Each service
945 can have the following data:
946 - `name` of the service
947 - `type` type of service in the k8 cluster
948 - `ports` List of ports offered by the service, for each port includes at least
949 name, port, protocol
950 - `cluster_ip` Internal ip to be used inside k8s cluster
951 - `external_ip` List of external ips (in case they are available)
952 """
953
954 self.log.debug(
955 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
956 cluster_uuid, kdu_instance
957 )
958 )
959
960 # init env, paths
961 paths, env = self._init_paths_env(
962 cluster_name=cluster_uuid, create_if_not_exist=True
963 )
964
965 # sync local dir
966 self.fs.sync(from_path=cluster_uuid)
967
968 # get list of services names for kdu
969 service_names = await self._get_services(
970 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
971 )
972
973 service_list = []
974 for service in service_names:
975 service = await self._get_service(cluster_uuid, service, namespace)
976 service_list.append(service)
977
978 # sync fs
979 self.fs.reverse_sync(from_path=cluster_uuid)
980
981 return service_list
982
983 async def get_service(
984 self, cluster_uuid: str, service_name: str, namespace: str
985 ) -> object:
986
987 self.log.debug(
988 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
989 service_name, namespace, cluster_uuid
990 )
991 )
992
993 # sync local dir
994 self.fs.sync(from_path=cluster_uuid)
995
996 service = await self._get_service(cluster_uuid, service_name, namespace)
997
998 # sync fs
999 self.fs.reverse_sync(from_path=cluster_uuid)
1000
1001 return service
1002
1003 async def status_kdu(
1004 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1005 ) -> Union[str, dict]:
1006 """
1007 This call would retrieve tha current state of a given KDU instance. It would be
1008 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1009 values_ of the configuration parameters applied to a given instance. This call
1010 would be based on the `status` call.
1011
1012 :param cluster_uuid: UUID of a K8s cluster known by OSM
1013 :param kdu_instance: unique name for the KDU instance
1014 :param kwargs: Additional parameters (None yet)
1015 :param yaml_format: if the return shall be returned as an YAML string or as a
1016 dictionary
1017 :return: If successful, it will return the following vector of arguments:
1018 - K8s `namespace` in the cluster where the KDU lives
1019 - `state` of the KDU instance. It can be:
1020 - UNKNOWN
1021 - DEPLOYED
1022 - DELETED
1023 - SUPERSEDED
1024 - FAILED or
1025 - DELETING
1026 - List of `resources` (objects) that this release consists of, sorted by kind,
1027 and the status of those resources
1028 - Last `deployment_time`.
1029
1030 """
1031 self.log.debug(
1032 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1033 cluster_uuid, kdu_instance
1034 )
1035 )
1036
1037 # sync local dir
1038 self.fs.sync(from_path=cluster_uuid)
1039
1040 # get instance: needed to obtain namespace
1041 instances = await self._instances_list(cluster_id=cluster_uuid)
1042 for instance in instances:
1043 if instance.get("name") == kdu_instance:
1044 break
1045 else:
1046 # instance does not exist
1047 raise K8sException(
1048 "Instance name: {} not found in cluster: {}".format(
1049 kdu_instance, cluster_uuid
1050 )
1051 )
1052
1053 status = await self._status_kdu(
1054 cluster_id=cluster_uuid,
1055 kdu_instance=kdu_instance,
1056 namespace=instance["namespace"],
1057 yaml_format=yaml_format,
1058 show_error_log=True,
1059 )
1060
1061 # sync fs
1062 self.fs.reverse_sync(from_path=cluster_uuid)
1063
1064 return status
1065
1066 async def get_values_kdu(
1067 self, kdu_instance: str, namespace: str, kubeconfig: str
1068 ) -> str:
1069
1070 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1071
1072 return await self._exec_get_command(
1073 get_command="values",
1074 kdu_instance=kdu_instance,
1075 namespace=namespace,
1076 kubeconfig=kubeconfig,
1077 )
1078
1079 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1080
1081 self.log.debug(
1082 "inspect kdu_model values {} from (optional) repo: {}".format(
1083 kdu_model, repo_url
1084 )
1085 )
1086
1087 return await self._exec_inspect_command(
1088 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1089 )
1090
1091 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1092
1093 self.log.debug(
1094 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1095 )
1096
1097 return await self._exec_inspect_command(
1098 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1099 )
1100
1101 async def synchronize_repos(self, cluster_uuid: str):
1102
1103 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1104 try:
1105 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1106 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1107
1108 local_repo_list = await self.repo_list(cluster_uuid)
1109 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1110
1111 deleted_repo_list = []
1112 added_repo_dict = {}
1113
1114 # iterate over the list of repos in the database that should be
1115 # added if not present
1116 for repo_name, db_repo in db_repo_dict.items():
1117 try:
1118 # check if it is already present
1119 curr_repo_url = local_repo_dict.get(db_repo["name"])
1120 repo_id = db_repo.get("_id")
1121 if curr_repo_url != db_repo["url"]:
1122 if curr_repo_url:
1123 self.log.debug(
1124 "repo {} url changed, delete and and again".format(
1125 db_repo["url"]
1126 )
1127 )
1128 await self.repo_remove(cluster_uuid, db_repo["name"])
1129 deleted_repo_list.append(repo_id)
1130
1131 # add repo
1132 self.log.debug("add repo {}".format(db_repo["name"]))
1133 if "ca_cert" in db_repo:
1134 await self.repo_add(
1135 cluster_uuid,
1136 db_repo["name"],
1137 db_repo["url"],
1138 cert=db_repo["ca_cert"],
1139 )
1140 else:
1141 await self.repo_add(
1142 cluster_uuid,
1143 db_repo["name"],
1144 db_repo["url"],
1145 )
1146 added_repo_dict[repo_id] = db_repo["name"]
1147 except Exception as e:
1148 raise K8sException(
1149 "Error adding repo id: {}, err_msg: {} ".format(
1150 repo_id, repr(e)
1151 )
1152 )
1153
1154 # Delete repos that are present but not in nbi_list
1155 for repo_name in local_repo_dict:
1156 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1157 self.log.debug("delete repo {}".format(repo_name))
1158 try:
1159 await self.repo_remove(cluster_uuid, repo_name)
1160 deleted_repo_list.append(repo_name)
1161 except Exception as e:
1162 self.warning(
1163 "Error deleting repo, name: {}, err_msg: {}".format(
1164 repo_name, str(e)
1165 )
1166 )
1167
1168 return deleted_repo_list, added_repo_dict
1169
1170 except K8sException:
1171 raise
1172 except Exception as e:
1173 # Do not raise errors synchronizing repos
1174 self.log.error("Error synchronizing repos: {}".format(e))
1175 raise Exception("Error synchronizing repos: {}".format(e))
1176
1177 def _get_db_repos_dict(self, repo_ids: list):
1178 db_repos_dict = {}
1179 for repo_id in repo_ids:
1180 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1181 db_repos_dict[db_repo["name"]] = db_repo
1182 return db_repos_dict
1183
1184 """
1185 ####################################################################################
1186 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1187 ####################################################################################
1188 """
1189
1190 @abc.abstractmethod
1191 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1192 """
1193 Creates and returns base cluster and kube dirs and returns them.
1194 Also created helm3 dirs according to new directory specification, paths are
1195 not returned but assigned to helm environment variables
1196
1197 :param cluster_name: cluster_name
1198 :return: Dictionary with config_paths and dictionary with helm environment variables
1199 """
1200
1201 @abc.abstractmethod
1202 async def _cluster_init(self, cluster_id, namespace, paths, env):
1203 """
1204 Implements the helm version dependent cluster initialization
1205 """
1206
1207 @abc.abstractmethod
1208 async def _instances_list(self, cluster_id):
1209 """
1210 Implements the helm version dependent helm instances list
1211 """
1212
1213 @abc.abstractmethod
1214 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1215 """
1216 Implements the helm version dependent method to obtain services from a helm instance
1217 """
1218
1219 @abc.abstractmethod
1220 async def _status_kdu(
1221 self,
1222 cluster_id: str,
1223 kdu_instance: str,
1224 namespace: str = None,
1225 yaml_format: bool = False,
1226 show_error_log: bool = False,
1227 ) -> Union[str, dict]:
1228 """
1229 Implements the helm version dependent method to obtain status of a helm instance
1230 """
1231
1232 @abc.abstractmethod
1233 def _get_install_command(
1234 self,
1235 kdu_model,
1236 kdu_instance,
1237 namespace,
1238 params_str,
1239 version,
1240 atomic,
1241 timeout,
1242 kubeconfig,
1243 ) -> str:
1244 """
1245 Obtain command to be executed to delete the indicated instance
1246 """
1247
1248 @abc.abstractmethod
1249 def _get_upgrade_scale_command(
1250 self,
1251 kdu_model,
1252 kdu_instance,
1253 namespace,
1254 count,
1255 version,
1256 atomic,
1257 replicas,
1258 timeout,
1259 resource_name,
1260 kubeconfig,
1261 ) -> str:
1262 """Obtain command to be executed to upgrade the indicated instance."""
1263
1264 @abc.abstractmethod
1265 def _get_upgrade_command(
1266 self,
1267 kdu_model,
1268 kdu_instance,
1269 namespace,
1270 params_str,
1271 version,
1272 atomic,
1273 timeout,
1274 kubeconfig,
1275 ) -> str:
1276 """
1277 Obtain command to be executed to upgrade the indicated instance
1278 """
1279
1280 @abc.abstractmethod
1281 def _get_rollback_command(
1282 self, kdu_instance, namespace, revision, kubeconfig
1283 ) -> str:
1284 """
1285 Obtain command to be executed to rollback the indicated instance
1286 """
1287
1288 @abc.abstractmethod
1289 def _get_uninstall_command(
1290 self, kdu_instance: str, namespace: str, kubeconfig: str
1291 ) -> str:
1292 """
1293 Obtain command to be executed to delete the indicated instance
1294 """
1295
1296 @abc.abstractmethod
1297 def _get_inspect_command(
1298 self, show_command: str, kdu_model: str, repo_str: str, version: str
1299 ):
1300 """
1301 Obtain command to be executed to obtain information about the kdu
1302 """
1303
1304 @abc.abstractmethod
1305 def _get_get_command(
1306 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1307 ):
1308 """Obtain command to be executed to get information about the kdu instance."""
1309
1310 @abc.abstractmethod
1311 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1312 """
1313 Method call to uninstall cluster software for helm. This method is dependent
1314 of helm version
1315 For Helm v2 it will be called when Tiller must be uninstalled
1316 For Helm v3 it does nothing and does not need to be callled
1317 """
1318
1319 @abc.abstractmethod
1320 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1321 """
1322 Obtains the cluster repos identifiers
1323 """
1324
1325 """
1326 ####################################################################################
1327 ################################### P R I V A T E ##################################
1328 ####################################################################################
1329 """
1330
1331 @staticmethod
1332 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1333 if os.path.exists(filename):
1334 return True
1335 else:
1336 msg = "File {} does not exist".format(filename)
1337 if exception_if_not_exists:
1338 raise K8sException(msg)
1339
1340 @staticmethod
1341 def _remove_multiple_spaces(strobj):
1342 strobj = strobj.strip()
1343 while " " in strobj:
1344 strobj = strobj.replace(" ", " ")
1345 return strobj
1346
1347 @staticmethod
1348 def _output_to_lines(output: str) -> list:
1349 output_lines = list()
1350 lines = output.splitlines(keepends=False)
1351 for line in lines:
1352 line = line.strip()
1353 if len(line) > 0:
1354 output_lines.append(line)
1355 return output_lines
1356
1357 @staticmethod
1358 def _output_to_table(output: str) -> list:
1359 output_table = list()
1360 lines = output.splitlines(keepends=False)
1361 for line in lines:
1362 line = line.replace("\t", " ")
1363 line_list = list()
1364 output_table.append(line_list)
1365 cells = line.split(sep=" ")
1366 for cell in cells:
1367 cell = cell.strip()
1368 if len(cell) > 0:
1369 line_list.append(cell)
1370 return output_table
1371
1372 @staticmethod
1373 def _parse_services(output: str) -> list:
1374 lines = output.splitlines(keepends=False)
1375 services = []
1376 for line in lines:
1377 line = line.replace("\t", " ")
1378 cells = line.split(sep=" ")
1379 if len(cells) > 0 and cells[0].startswith("service/"):
1380 elems = cells[0].split(sep="/")
1381 if len(elems) > 1:
1382 services.append(elems[1])
1383 return services
1384
1385 @staticmethod
1386 def _get_deep(dictionary: dict, members: tuple):
1387 target = dictionary
1388 value = None
1389 try:
1390 for m in members:
1391 value = target.get(m)
1392 if not value:
1393 return None
1394 else:
1395 target = value
1396 except Exception:
1397 pass
1398 return value
1399
1400 # find key:value in several lines
1401 @staticmethod
1402 def _find_in_lines(p_lines: list, p_key: str) -> str:
1403 for line in p_lines:
1404 try:
1405 if line.startswith(p_key + ":"):
1406 parts = line.split(":")
1407 the_value = parts[1].strip()
1408 return the_value
1409 except Exception:
1410 # ignore it
1411 pass
1412 return None
1413
1414 @staticmethod
1415 def _lower_keys_list(input_list: list):
1416 """
1417 Transform the keys in a list of dictionaries to lower case and returns a new list
1418 of dictionaries
1419 """
1420 new_list = []
1421 if input_list:
1422 for dictionary in input_list:
1423 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1424 new_list.append(new_dict)
1425 return new_list
1426
1427 async def _local_async_exec(
1428 self,
1429 command: str,
1430 raise_exception_on_error: bool = False,
1431 show_error_log: bool = True,
1432 encode_utf8: bool = False,
1433 env: dict = None,
1434 ) -> (str, int):
1435
1436 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1437 self.log.debug(
1438 "Executing async local command: {}, env: {}".format(command, env)
1439 )
1440
1441 # split command
1442 command = shlex.split(command)
1443
1444 environ = os.environ.copy()
1445 if env:
1446 environ.update(env)
1447
1448 try:
1449 process = await asyncio.create_subprocess_exec(
1450 *command,
1451 stdout=asyncio.subprocess.PIPE,
1452 stderr=asyncio.subprocess.PIPE,
1453 env=environ,
1454 )
1455
1456 # wait for command terminate
1457 stdout, stderr = await process.communicate()
1458
1459 return_code = process.returncode
1460
1461 output = ""
1462 if stdout:
1463 output = stdout.decode("utf-8").strip()
1464 # output = stdout.decode()
1465 if stderr:
1466 output = stderr.decode("utf-8").strip()
1467 # output = stderr.decode()
1468
1469 if return_code != 0 and show_error_log:
1470 self.log.debug(
1471 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1472 )
1473 else:
1474 self.log.debug("Return code: {}".format(return_code))
1475
1476 if raise_exception_on_error and return_code != 0:
1477 raise K8sException(output)
1478
1479 if encode_utf8:
1480 output = output.encode("utf-8").strip()
1481 output = str(output).replace("\\n", "\n")
1482
1483 return output, return_code
1484
1485 except asyncio.CancelledError:
1486 raise
1487 except K8sException:
1488 raise
1489 except Exception as e:
1490 msg = "Exception executing command: {} -> {}".format(command, e)
1491 self.log.error(msg)
1492 if raise_exception_on_error:
1493 raise K8sException(e) from e
1494 else:
1495 return "", -1
1496
1497 async def _local_async_exec_pipe(
1498 self,
1499 command1: str,
1500 command2: str,
1501 raise_exception_on_error: bool = True,
1502 show_error_log: bool = True,
1503 encode_utf8: bool = False,
1504 env: dict = None,
1505 ):
1506
1507 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1508 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1509 command = "{} | {}".format(command1, command2)
1510 self.log.debug(
1511 "Executing async local command: {}, env: {}".format(command, env)
1512 )
1513
1514 # split command
1515 command1 = shlex.split(command1)
1516 command2 = shlex.split(command2)
1517
1518 environ = os.environ.copy()
1519 if env:
1520 environ.update(env)
1521
1522 try:
1523 read, write = os.pipe()
1524 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1525 os.close(write)
1526 process_2 = await asyncio.create_subprocess_exec(
1527 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1528 )
1529 os.close(read)
1530 stdout, stderr = await process_2.communicate()
1531
1532 return_code = process_2.returncode
1533
1534 output = ""
1535 if stdout:
1536 output = stdout.decode("utf-8").strip()
1537 # output = stdout.decode()
1538 if stderr:
1539 output = stderr.decode("utf-8").strip()
1540 # output = stderr.decode()
1541
1542 if return_code != 0 and show_error_log:
1543 self.log.debug(
1544 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1545 )
1546 else:
1547 self.log.debug("Return code: {}".format(return_code))
1548
1549 if raise_exception_on_error and return_code != 0:
1550 raise K8sException(output)
1551
1552 if encode_utf8:
1553 output = output.encode("utf-8").strip()
1554 output = str(output).replace("\\n", "\n")
1555
1556 return output, return_code
1557 except asyncio.CancelledError:
1558 raise
1559 except K8sException:
1560 raise
1561 except Exception as e:
1562 msg = "Exception executing command: {} -> {}".format(command, e)
1563 self.log.error(msg)
1564 if raise_exception_on_error:
1565 raise K8sException(e) from e
1566 else:
1567 return "", -1
1568
1569 async def _get_service(self, cluster_id, service_name, namespace):
1570 """
1571 Obtains the data of the specified service in the k8cluster.
1572
1573 :param cluster_id: id of a K8s cluster known by OSM
1574 :param service_name: name of the K8s service in the specified namespace
1575 :param namespace: K8s namespace used by the KDU instance
1576 :return: If successful, it will return a service with the following data:
1577 - `name` of the service
1578 - `type` type of service in the k8 cluster
1579 - `ports` List of ports offered by the service, for each port includes at least
1580 name, port, protocol
1581 - `cluster_ip` Internal ip to be used inside k8s cluster
1582 - `external_ip` List of external ips (in case they are available)
1583 """
1584
1585 # init config, env
1586 paths, env = self._init_paths_env(
1587 cluster_name=cluster_id, create_if_not_exist=True
1588 )
1589
1590 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1591 self.kubectl_command, paths["kube_config"], namespace, service_name
1592 )
1593
1594 output, _rc = await self._local_async_exec(
1595 command=command, raise_exception_on_error=True, env=env
1596 )
1597
1598 data = yaml.load(output, Loader=yaml.SafeLoader)
1599
1600 service = {
1601 "name": service_name,
1602 "type": self._get_deep(data, ("spec", "type")),
1603 "ports": self._get_deep(data, ("spec", "ports")),
1604 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1605 }
1606 if service["type"] == "LoadBalancer":
1607 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1608 ip_list = [elem["ip"] for elem in ip_map_list]
1609 service["external_ip"] = ip_list
1610
1611 return service
1612
1613 async def _exec_get_command(
1614 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1615 ):
1616 """Obtains information about the kdu instance."""
1617
1618 full_command = self._get_get_command(
1619 get_command, kdu_instance, namespace, kubeconfig
1620 )
1621
1622 output, _rc = await self._local_async_exec(command=full_command)
1623
1624 return output
1625
1626 async def _exec_inspect_command(
1627 self, inspect_command: str, kdu_model: str, repo_url: str = None
1628 ):
1629 """Obtains information about a kdu, no cluster (no env)."""
1630
1631 repo_str = ""
1632 if repo_url:
1633 repo_str = " --repo {}".format(repo_url)
1634
1635 idx = kdu_model.find("/")
1636 if idx >= 0:
1637 idx += 1
1638 kdu_model = kdu_model[idx:]
1639
1640 kdu_model, version = self._split_version(kdu_model)
1641 if version:
1642 version_str = "--version {}".format(version)
1643 else:
1644 version_str = ""
1645
1646 full_command = self._get_inspect_command(
1647 inspect_command, kdu_model, repo_str, version_str
1648 )
1649
1650 output, _rc = await self._local_async_exec(command=full_command)
1651
1652 return output
1653
1654 async def _get_replica_count_url(
1655 self,
1656 kdu_model: str,
1657 repo_url: str,
1658 resource_name: str = None,
1659 ):
1660 """Get the replica count value in the Helm Chart Values.
1661
1662 Args:
1663 kdu_model: The name or path of a bundle
1664 repo_url: Helm Chart repository url
1665 resource_name: Resource name
1666
1667 Returns:
1668 True if replicas, False replicaCount
1669 """
1670
1671 kdu_values = yaml.load(
1672 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1673 )
1674
1675 if not kdu_values:
1676 raise K8sException(
1677 "kdu_values not found for kdu_model {}".format(kdu_model)
1678 )
1679
1680 if resource_name:
1681 kdu_values = kdu_values.get(resource_name, None)
1682
1683 if not kdu_values:
1684 msg = "resource {} not found in the values in model {}".format(
1685 resource_name, kdu_model
1686 )
1687 self.log.error(msg)
1688 raise K8sException(msg)
1689
1690 duplicate_check = False
1691
1692 replica_str = ""
1693 replicas = None
1694
1695 if kdu_values.get("replicaCount", None):
1696 replicas = kdu_values["replicaCount"]
1697 replica_str = "replicaCount"
1698 elif kdu_values.get("replicas", None):
1699 duplicate_check = True
1700 replicas = kdu_values["replicas"]
1701 replica_str = "replicas"
1702 else:
1703 if resource_name:
1704 msg = (
1705 "replicaCount or replicas not found in the resource"
1706 "{} values in model {}. Cannot be scaled".format(
1707 resource_name, kdu_model
1708 )
1709 )
1710 else:
1711 msg = (
1712 "replicaCount or replicas not found in the values"
1713 "in model {}. Cannot be scaled".format(kdu_model)
1714 )
1715 self.log.error(msg)
1716 raise K8sException(msg)
1717
1718 # Control if replicas and replicaCount exists at the same time
1719 msg = "replicaCount and replicas are exists at the same time"
1720 if duplicate_check:
1721 if "replicaCount" in kdu_values:
1722 self.log.error(msg)
1723 raise K8sException(msg)
1724 else:
1725 if "replicas" in kdu_values:
1726 self.log.error(msg)
1727 raise K8sException(msg)
1728
1729 return replicas, replica_str
1730
1731 async def _get_replica_count_instance(
1732 self,
1733 kdu_instance: str,
1734 namespace: str,
1735 kubeconfig: str,
1736 resource_name: str = None,
1737 ):
1738 """Get the replica count value in the instance.
1739
1740 Args:
1741 kdu_instance: The name of the KDU instance
1742 namespace: KDU instance namespace
1743 kubeconfig:
1744 resource_name: Resource name
1745
1746 Returns:
1747 True if replicas, False replicaCount
1748 """
1749
1750 kdu_values = yaml.load(
1751 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1752 Loader=yaml.SafeLoader,
1753 )
1754
1755 replicas = None
1756
1757 if kdu_values:
1758 resource_values = (
1759 kdu_values.get(resource_name, None) if resource_name else None
1760 )
1761 replicas = (
1762 (
1763 resource_values.get("replicaCount", None)
1764 or resource_values.get("replicas", None)
1765 )
1766 if resource_values
1767 else (
1768 kdu_values.get("replicaCount", None)
1769 or kdu_values.get("replicas", None)
1770 )
1771 )
1772
1773 return replicas
1774
1775 async def _store_status(
1776 self,
1777 cluster_id: str,
1778 operation: str,
1779 kdu_instance: str,
1780 namespace: str = None,
1781 check_every: float = 10,
1782 db_dict: dict = None,
1783 run_once: bool = False,
1784 ):
1785 while True:
1786 try:
1787 await asyncio.sleep(check_every)
1788 detailed_status = await self._status_kdu(
1789 cluster_id=cluster_id,
1790 kdu_instance=kdu_instance,
1791 yaml_format=False,
1792 namespace=namespace,
1793 )
1794 status = detailed_status.get("info").get("description")
1795 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1796 # write status to db
1797 result = await self.write_app_status_to_db(
1798 db_dict=db_dict,
1799 status=str(status),
1800 detailed_status=str(detailed_status),
1801 operation=operation,
1802 )
1803 if not result:
1804 self.log.info("Error writing in database. Task exiting...")
1805 return
1806 except asyncio.CancelledError:
1807 self.log.debug("Task cancelled")
1808 return
1809 except Exception as e:
1810 self.log.debug(
1811 "_store_status exception: {}".format(str(e)), exc_info=True
1812 )
1813 pass
1814 finally:
1815 if run_once:
1816 return
1817
1818 # params for use in -f file
1819 # returns values file option and filename (in order to delete it at the end)
1820 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1821
1822 if params and len(params) > 0:
1823 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1824
1825 def get_random_number():
1826 r = random.randrange(start=1, stop=99999999)
1827 s = str(r)
1828 while len(s) < 10:
1829 s = "0" + s
1830 return s
1831
1832 params2 = dict()
1833 for key in params:
1834 value = params.get(key)
1835 if "!!yaml" in str(value):
1836 value = yaml.load(value[7:])
1837 params2[key] = value
1838
1839 values_file = get_random_number() + ".yaml"
1840 with open(values_file, "w") as stream:
1841 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1842
1843 return "-f {}".format(values_file), values_file
1844
1845 return "", None
1846
1847 # params for use in --set option
1848 @staticmethod
1849 def _params_to_set_option(params: dict) -> str:
1850 params_str = ""
1851 if params and len(params) > 0:
1852 start = True
1853 for key in params:
1854 value = params.get(key, None)
1855 if value is not None:
1856 if start:
1857 params_str += "--set "
1858 start = False
1859 else:
1860 params_str += ","
1861 params_str += "{}={}".format(key, value)
1862 return params_str
1863
1864 @staticmethod
1865 def generate_kdu_instance_name(**kwargs):
1866 chart_name = kwargs["kdu_model"]
1867 # check embeded chart (file or dir)
1868 if chart_name.startswith("/"):
1869 # extract file or directory name
1870 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1871 # check URL
1872 elif "://" in chart_name:
1873 # extract last portion of URL
1874 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1875
1876 name = ""
1877 for c in chart_name:
1878 if c.isalpha() or c.isnumeric():
1879 name += c
1880 else:
1881 name += "-"
1882 if len(name) > 35:
1883 name = name[0:35]
1884
1885 # if does not start with alpha character, prefix 'a'
1886 if not name[0].isalpha():
1887 name = "a" + name
1888
1889 name += "-"
1890
1891 def get_random_number():
1892 r = random.randrange(start=1, stop=99999999)
1893 s = str(r)
1894 s = s.rjust(10, "0")
1895 return s
1896
1897 name = name + get_random_number()
1898 return name.lower()
1899
1900 def _split_version(self, kdu_model: str) -> (str, str):
1901 version = None
1902 if ":" in kdu_model:
1903 parts = kdu_model.split(sep=":")
1904 if len(parts) == 2:
1905 version = str(parts[1])
1906 kdu_model = parts[0]
1907 return kdu_model, version
1908
1909 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1910 repo_url = None
1911 idx = kdu_model.find("/")
1912 if idx >= 0:
1913 repo_name = kdu_model[:idx]
1914 # Find repository link
1915 local_repo_list = await self.repo_list(cluster_uuid)
1916 for repo in local_repo_list:
1917 repo_url = repo["url"] if repo["name"] == repo_name else None
1918 return repo_url