Fix bug 2088 by quoting inputs for commands
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 from shlex import quote
26 import random
27 import time
28 import shlex
29 import shutil
30 import stat
31 import os
32 import yaml
33 from uuid import uuid4
34
35 from n2vc.config import EnvironConfig
36 from n2vc.exceptions import K8sException
37 from n2vc.k8s_conn import K8sConnector
38 from n2vc.kubectl import Kubectl
39
40
41 class K8sHelmBaseConnector(K8sConnector):
42
43 """
44 ####################################################################################
45 ################################### P U B L I C ####################################
46 ####################################################################################
47 """
48
49 service_account = "osm"
50
51 def __init__(
52 self,
53 fs: object,
54 db: object,
55 kubectl_command: str = "/usr/bin/kubectl",
56 helm_command: str = "/usr/bin/helm",
57 log: object = None,
58 on_update_db=None,
59 ):
60 """
61
62 :param fs: file system for kubernetes and helm configuration
63 :param db: database object to write current operation status
64 :param kubectl_command: path to kubectl executable
65 :param helm_command: path to helm executable
66 :param log: logger
67 :param on_update_db: callback called when k8s connector updates database
68 """
69
70 # parent class
71 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
72
73 self.log.info("Initializing K8S Helm connector")
74
75 self.config = EnvironConfig()
76 # random numbers for release name generation
77 random.seed(time.time())
78
79 # the file system
80 self.fs = fs
81
82 # exception if kubectl is not installed
83 self.kubectl_command = kubectl_command
84 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
85
86 # exception if helm is not installed
87 self._helm_command = helm_command
88 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
89
90 # obtain stable repo url from config or apply default
91 self._stable_repo_url = self.config.get("stablerepourl")
92 if self._stable_repo_url == "None":
93 self._stable_repo_url = None
94
95 # Lock to avoid concurrent execution of helm commands
96 self.cmd_lock = asyncio.Lock()
97
98 def _get_namespace(self, cluster_uuid: str) -> str:
99 """
100 Obtains the namespace used by the cluster with the uuid passed by argument
101
102 param: cluster_uuid: cluster's uuid
103 """
104
105 # first, obtain the cluster corresponding to the uuid passed by argument
106 k8scluster = self.db.get_one(
107 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
108 )
109 return k8scluster.get("namespace")
110
111 async def init_env(
112 self,
113 k8s_creds: str,
114 namespace: str = "kube-system",
115 reuse_cluster_uuid=None,
116 **kwargs,
117 ) -> tuple[str, bool]:
118 """
119 It prepares a given K8s cluster environment to run Charts
120
121 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
122 '.kube/config'
123 :param namespace: optional namespace to be used for helm. By default,
124 'kube-system' will be used
125 :param reuse_cluster_uuid: existing cluster uuid for reuse
126 :param kwargs: Additional parameters (None yet)
127 :return: uuid of the K8s cluster and True if connector has installed some
128 software in the cluster
129 (on error, an exception will be raised)
130 """
131
132 if reuse_cluster_uuid:
133 cluster_id = reuse_cluster_uuid
134 else:
135 cluster_id = str(uuid4())
136
137 self.log.debug(
138 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
139 )
140
141 paths, env = self._init_paths_env(
142 cluster_name=cluster_id, create_if_not_exist=True
143 )
144 mode = stat.S_IRUSR | stat.S_IWUSR
145 with open(paths["kube_config"], "w", mode) as f:
146 f.write(k8s_creds)
147 os.chmod(paths["kube_config"], 0o600)
148
149 # Code with initialization specific of helm version
150 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
151
152 # sync fs with local data
153 self.fs.reverse_sync(from_path=cluster_id)
154
155 self.log.info("Cluster {} initialized".format(cluster_id))
156
157 return cluster_id, n2vc_installed_sw
158
159 async def repo_add(
160 self,
161 cluster_uuid: str,
162 name: str,
163 url: str,
164 repo_type: str = "chart",
165 cert: str = None,
166 user: str = None,
167 password: str = None,
168 ):
169 self.log.debug(
170 "Cluster {}, adding {} repository {}. URL: {}".format(
171 cluster_uuid, repo_type, name, url
172 )
173 )
174
175 # init_env
176 paths, env = self._init_paths_env(
177 cluster_name=cluster_uuid, create_if_not_exist=True
178 )
179
180 # sync local dir
181 self.fs.sync(from_path=cluster_uuid)
182
183 # helm repo add name url
184 command = ("env KUBECONFIG={} {} repo add {} {}").format(
185 paths["kube_config"], self._helm_command, quote(name), quote(url)
186 )
187
188 if cert:
189 temp_cert_file = os.path.join(
190 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
191 )
192 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
193 with open(temp_cert_file, "w") as the_cert:
194 the_cert.write(cert)
195 command += " --ca-file {}".format(quote(temp_cert_file))
196
197 if user:
198 command += " --username={}".format(quote(user))
199
200 if password:
201 command += " --password={}".format(quote(password))
202
203 self.log.debug("adding repo: {}".format(command))
204 await self._local_async_exec(
205 command=command, raise_exception_on_error=True, env=env
206 )
207
208 # helm repo update
209 command = "env KUBECONFIG={} {} repo update {}".format(
210 paths["kube_config"], self._helm_command, quote(name)
211 )
212 self.log.debug("updating repo: {}".format(command))
213 await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
218 self.fs.reverse_sync(from_path=cluster_uuid)
219
220 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
221 self.log.debug(
222 "Cluster {}, updating {} repository {}".format(
223 cluster_uuid, repo_type, name
224 )
225 )
226
227 # init_env
228 paths, env = self._init_paths_env(
229 cluster_name=cluster_uuid, create_if_not_exist=True
230 )
231
232 # sync local dir
233 self.fs.sync(from_path=cluster_uuid)
234
235 # helm repo update
236 command = "{} repo update {}".format(self._helm_command, quote(name))
237 self.log.debug("updating repo: {}".format(command))
238 await self._local_async_exec(
239 command=command, raise_exception_on_error=False, env=env
240 )
241
242 # sync fs
243 self.fs.reverse_sync(from_path=cluster_uuid)
244
245 async def repo_list(self, cluster_uuid: str) -> list:
246 """
247 Get the list of registered repositories
248
249 :return: list of registered repositories: [ (name, url) .... ]
250 """
251
252 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
253
254 # config filename
255 paths, env = self._init_paths_env(
256 cluster_name=cluster_uuid, create_if_not_exist=True
257 )
258
259 # sync local dir
260 self.fs.sync(from_path=cluster_uuid)
261
262 command = "env KUBECONFIG={} {} repo list --output yaml".format(
263 paths["kube_config"], self._helm_command
264 )
265
266 # Set exception to false because if there are no repos just want an empty list
267 output, _rc = await self._local_async_exec(
268 command=command, raise_exception_on_error=False, env=env
269 )
270
271 # sync fs
272 self.fs.reverse_sync(from_path=cluster_uuid)
273
274 if _rc == 0:
275 if output and len(output) > 0:
276 repos = yaml.load(output, Loader=yaml.SafeLoader)
277 # unify format between helm2 and helm3 setting all keys lowercase
278 return self._lower_keys_list(repos)
279 else:
280 return []
281 else:
282 return []
283
284 async def repo_remove(self, cluster_uuid: str, name: str):
285 self.log.debug(
286 "remove {} repositories for cluster {}".format(name, cluster_uuid)
287 )
288
289 # init env, paths
290 paths, env = self._init_paths_env(
291 cluster_name=cluster_uuid, create_if_not_exist=True
292 )
293
294 # sync local dir
295 self.fs.sync(from_path=cluster_uuid)
296
297 command = "env KUBECONFIG={} {} repo remove {}".format(
298 paths["kube_config"], self._helm_command, quote(name)
299 )
300 await self._local_async_exec(
301 command=command, raise_exception_on_error=True, env=env
302 )
303
304 # sync fs
305 self.fs.reverse_sync(from_path=cluster_uuid)
306
307 async def reset(
308 self,
309 cluster_uuid: str,
310 force: bool = False,
311 uninstall_sw: bool = False,
312 **kwargs,
313 ) -> bool:
314 """Reset a cluster
315
316 Resets the Kubernetes cluster by removing the helm deployment that represents it.
317
318 :param cluster_uuid: The UUID of the cluster to reset
319 :param force: Boolean to force the reset
320 :param uninstall_sw: Boolean to force the reset
321 :param kwargs: Additional parameters (None yet)
322 :return: Returns True if successful or raises an exception.
323 """
324 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
325 self.log.debug(
326 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
327 cluster_uuid, uninstall_sw
328 )
329 )
330
331 # sync local dir
332 self.fs.sync(from_path=cluster_uuid)
333
334 # uninstall releases if needed.
335 if uninstall_sw:
336 releases = await self.instances_list(cluster_uuid=cluster_uuid)
337 if len(releases) > 0:
338 if force:
339 for r in releases:
340 try:
341 kdu_instance = r.get("name")
342 chart = r.get("chart")
343 self.log.debug(
344 "Uninstalling {} -> {}".format(chart, kdu_instance)
345 )
346 await self.uninstall(
347 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
348 )
349 except Exception as e:
350 # will not raise exception as it was found
351 # that in some cases of previously installed helm releases it
352 # raised an error
353 self.log.warn(
354 "Error uninstalling release {}: {}".format(
355 kdu_instance, e
356 )
357 )
358 else:
359 msg = (
360 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
361 ).format(cluster_uuid)
362 self.log.warn(msg)
363 uninstall_sw = (
364 False # Allow to remove k8s cluster without removing Tiller
365 )
366
367 if uninstall_sw:
368 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
369
370 # delete cluster directory
371 self.log.debug("Removing directory {}".format(cluster_uuid))
372 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
373 # Remove also local directorio if still exist
374 direct = self.fs.path + "/" + cluster_uuid
375 shutil.rmtree(direct, ignore_errors=True)
376
377 return True
378
379 def _is_helm_chart_a_file(self, chart_name: str):
380 return chart_name.count("/") > 1
381
382 async def _install_impl(
383 self,
384 cluster_id: str,
385 kdu_model: str,
386 paths: dict,
387 env: dict,
388 kdu_instance: str,
389 atomic: bool = True,
390 timeout: float = 300,
391 params: dict = None,
392 db_dict: dict = None,
393 kdu_name: str = None,
394 namespace: str = None,
395 ):
396 # init env, paths
397 paths, env = self._init_paths_env(
398 cluster_name=cluster_id, create_if_not_exist=True
399 )
400
401 # params to str
402 params_str, file_to_delete = self._params_to_file_option(
403 cluster_id=cluster_id, params=params
404 )
405
406 # version
407 kdu_model, version = self._split_version(kdu_model)
408
409 _, repo = self._split_repo(kdu_model)
410 if repo:
411 await self.repo_update(cluster_id, repo)
412
413 command = self._get_install_command(
414 kdu_model,
415 kdu_instance,
416 namespace,
417 params_str,
418 version,
419 atomic,
420 timeout,
421 paths["kube_config"],
422 )
423
424 self.log.debug("installing: {}".format(command))
425
426 if atomic:
427 # exec helm in a task
428 exec_task = asyncio.ensure_future(
429 coro_or_future=self._local_async_exec(
430 command=command, raise_exception_on_error=False, env=env
431 )
432 )
433
434 # write status in another task
435 status_task = asyncio.ensure_future(
436 coro_or_future=self._store_status(
437 cluster_id=cluster_id,
438 kdu_instance=kdu_instance,
439 namespace=namespace,
440 db_dict=db_dict,
441 operation="install",
442 )
443 )
444
445 # wait for execution task
446 await asyncio.wait([exec_task])
447
448 # cancel status task
449 status_task.cancel()
450
451 output, rc = exec_task.result()
452
453 else:
454 output, rc = await self._local_async_exec(
455 command=command, raise_exception_on_error=False, env=env
456 )
457
458 # remove temporal values yaml file
459 if file_to_delete:
460 os.remove(file_to_delete)
461
462 # write final status
463 await self._store_status(
464 cluster_id=cluster_id,
465 kdu_instance=kdu_instance,
466 namespace=namespace,
467 db_dict=db_dict,
468 operation="install",
469 )
470
471 if rc != 0:
472 msg = "Error executing command: {}\nOutput: {}".format(command, output)
473 self.log.error(msg)
474 raise K8sException(msg)
475
476 async def upgrade(
477 self,
478 cluster_uuid: str,
479 kdu_instance: str,
480 kdu_model: str = None,
481 atomic: bool = True,
482 timeout: float = 300,
483 params: dict = None,
484 db_dict: dict = None,
485 namespace: str = None,
486 force: bool = False,
487 ):
488 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
489
490 # sync local dir
491 self.fs.sync(from_path=cluster_uuid)
492
493 # look for instance to obtain namespace
494
495 # set namespace
496 if not namespace:
497 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
498 if not instance_info:
499 raise K8sException("kdu_instance {} not found".format(kdu_instance))
500 namespace = instance_info["namespace"]
501
502 # init env, paths
503 paths, env = self._init_paths_env(
504 cluster_name=cluster_uuid, create_if_not_exist=True
505 )
506
507 # sync local dir
508 self.fs.sync(from_path=cluster_uuid)
509
510 # params to str
511 params_str, file_to_delete = self._params_to_file_option(
512 cluster_id=cluster_uuid, params=params
513 )
514
515 # version
516 kdu_model, version = self._split_version(kdu_model)
517
518 _, repo = self._split_repo(kdu_model)
519 if repo:
520 await self.repo_update(cluster_uuid, repo)
521
522 command = self._get_upgrade_command(
523 kdu_model,
524 kdu_instance,
525 namespace,
526 params_str,
527 version,
528 atomic,
529 timeout,
530 paths["kube_config"],
531 force,
532 )
533
534 self.log.debug("upgrading: {}".format(command))
535
536 if atomic:
537 # exec helm in a task
538 exec_task = asyncio.ensure_future(
539 coro_or_future=self._local_async_exec(
540 command=command, raise_exception_on_error=False, env=env
541 )
542 )
543 # write status in another task
544 status_task = asyncio.ensure_future(
545 coro_or_future=self._store_status(
546 cluster_id=cluster_uuid,
547 kdu_instance=kdu_instance,
548 namespace=namespace,
549 db_dict=db_dict,
550 operation="upgrade",
551 )
552 )
553
554 # wait for execution task
555 await asyncio.wait([exec_task])
556
557 # cancel status task
558 status_task.cancel()
559 output, rc = exec_task.result()
560
561 else:
562 output, rc = await self._local_async_exec(
563 command=command, raise_exception_on_error=False, env=env
564 )
565
566 # remove temporal values yaml file
567 if file_to_delete:
568 os.remove(file_to_delete)
569
570 # write final status
571 await self._store_status(
572 cluster_id=cluster_uuid,
573 kdu_instance=kdu_instance,
574 namespace=namespace,
575 db_dict=db_dict,
576 operation="upgrade",
577 )
578
579 if rc != 0:
580 msg = "Error executing command: {}\nOutput: {}".format(command, output)
581 self.log.error(msg)
582 raise K8sException(msg)
583
584 # sync fs
585 self.fs.reverse_sync(from_path=cluster_uuid)
586
587 # return new revision number
588 instance = await self.get_instance_info(
589 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
590 )
591 if instance:
592 revision = int(instance.get("revision"))
593 self.log.debug("New revision: {}".format(revision))
594 return revision
595 else:
596 return 0
597
598 async def scale(
599 self,
600 kdu_instance: str,
601 scale: int,
602 resource_name: str,
603 total_timeout: float = 1800,
604 cluster_uuid: str = None,
605 kdu_model: str = None,
606 atomic: bool = True,
607 db_dict: dict = None,
608 **kwargs,
609 ):
610 """Scale a resource in a Helm Chart.
611
612 Args:
613 kdu_instance: KDU instance name
614 scale: Scale to which to set the resource
615 resource_name: Resource name
616 total_timeout: The time, in seconds, to wait
617 cluster_uuid: The UUID of the cluster
618 kdu_model: The chart reference
619 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
620 The --wait flag will be set automatically if --atomic is used
621 db_dict: Dictionary for any additional data
622 kwargs: Additional parameters
623
624 Returns:
625 True if successful, False otherwise
626 """
627
628 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
629 if resource_name:
630 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
631 resource_name, kdu_model, cluster_uuid
632 )
633
634 self.log.debug(debug_mgs)
635
636 # look for instance to obtain namespace
637 # get_instance_info function calls the sync command
638 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
639 if not instance_info:
640 raise K8sException("kdu_instance {} not found".format(kdu_instance))
641
642 # init env, paths
643 paths, env = self._init_paths_env(
644 cluster_name=cluster_uuid, create_if_not_exist=True
645 )
646
647 # version
648 kdu_model, version = self._split_version(kdu_model)
649
650 repo_url = await self._find_repo(kdu_model, cluster_uuid)
651
652 _, replica_str = await self._get_replica_count_url(
653 kdu_model, repo_url, resource_name
654 )
655
656 command = self._get_upgrade_scale_command(
657 kdu_model,
658 kdu_instance,
659 instance_info["namespace"],
660 scale,
661 version,
662 atomic,
663 replica_str,
664 total_timeout,
665 resource_name,
666 paths["kube_config"],
667 )
668
669 self.log.debug("scaling: {}".format(command))
670
671 if atomic:
672 # exec helm in a task
673 exec_task = asyncio.ensure_future(
674 coro_or_future=self._local_async_exec(
675 command=command, raise_exception_on_error=False, env=env
676 )
677 )
678 # write status in another task
679 status_task = asyncio.ensure_future(
680 coro_or_future=self._store_status(
681 cluster_id=cluster_uuid,
682 kdu_instance=kdu_instance,
683 namespace=instance_info["namespace"],
684 db_dict=db_dict,
685 operation="scale",
686 )
687 )
688
689 # wait for execution task
690 await asyncio.wait([exec_task])
691
692 # cancel status task
693 status_task.cancel()
694 output, rc = exec_task.result()
695
696 else:
697 output, rc = await self._local_async_exec(
698 command=command, raise_exception_on_error=False, env=env
699 )
700
701 # write final status
702 await self._store_status(
703 cluster_id=cluster_uuid,
704 kdu_instance=kdu_instance,
705 namespace=instance_info["namespace"],
706 db_dict=db_dict,
707 operation="scale",
708 )
709
710 if rc != 0:
711 msg = "Error executing command: {}\nOutput: {}".format(command, output)
712 self.log.error(msg)
713 raise K8sException(msg)
714
715 # sync fs
716 self.fs.reverse_sync(from_path=cluster_uuid)
717
718 return True
719
720 async def get_scale_count(
721 self,
722 resource_name: str,
723 kdu_instance: str,
724 cluster_uuid: str,
725 kdu_model: str,
726 **kwargs,
727 ) -> int:
728 """Get a resource scale count.
729
730 Args:
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
734 kdu_model: The name or path of an Helm Chart
735 kwargs: Additional parameters
736
737 Returns:
738 Resource instance count
739 """
740
741 self.log.debug(
742 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
743 )
744
745 # look for instance to obtain namespace
746 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
747 if not instance_info:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance))
749
750 # init env, paths
751 paths, _ = self._init_paths_env(
752 cluster_name=cluster_uuid, create_if_not_exist=True
753 )
754
755 replicas = await self._get_replica_count_instance(
756 kdu_instance=kdu_instance,
757 namespace=instance_info["namespace"],
758 kubeconfig=paths["kube_config"],
759 resource_name=resource_name,
760 )
761
762 self.log.debug(
763 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
764 )
765
766 # Get default value if scale count is not found from provided values
767 # Important note: this piece of code shall only be executed in the first scaling operation,
768 # since it is expected that the _get_replica_count_instance is able to obtain the number of
769 # replicas when a scale operation was already conducted previously for this KDU/resource!
770 if replicas is None:
771 repo_url = await self._find_repo(
772 kdu_model=kdu_model, cluster_uuid=cluster_uuid
773 )
774 replicas, _ = await self._get_replica_count_url(
775 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
776 )
777
778 self.log.debug(
779 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
780 f"{resource_name} obtained: {replicas}"
781 )
782
783 if replicas is None:
784 msg = "Replica count not found. Cannot be scaled"
785 self.log.error(msg)
786 raise K8sException(msg)
787
788 return int(replicas)
789
790 async def rollback(
791 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
792 ):
793 self.log.debug(
794 "rollback kdu_instance {} to revision {} from cluster {}".format(
795 kdu_instance, revision, cluster_uuid
796 )
797 )
798
799 # sync local dir
800 self.fs.sync(from_path=cluster_uuid)
801
802 # look for instance to obtain namespace
803 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
804 if not instance_info:
805 raise K8sException("kdu_instance {} not found".format(kdu_instance))
806
807 # init env, paths
808 paths, env = self._init_paths_env(
809 cluster_name=cluster_uuid, create_if_not_exist=True
810 )
811
812 # sync local dir
813 self.fs.sync(from_path=cluster_uuid)
814
815 command = self._get_rollback_command(
816 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
817 )
818
819 self.log.debug("rolling_back: {}".format(command))
820
821 # exec helm in a task
822 exec_task = asyncio.ensure_future(
823 coro_or_future=self._local_async_exec(
824 command=command, raise_exception_on_error=False, env=env
825 )
826 )
827 # write status in another task
828 status_task = asyncio.ensure_future(
829 coro_or_future=self._store_status(
830 cluster_id=cluster_uuid,
831 kdu_instance=kdu_instance,
832 namespace=instance_info["namespace"],
833 db_dict=db_dict,
834 operation="rollback",
835 )
836 )
837
838 # wait for execution task
839 await asyncio.wait([exec_task])
840
841 # cancel status task
842 status_task.cancel()
843
844 output, rc = exec_task.result()
845
846 # write final status
847 await self._store_status(
848 cluster_id=cluster_uuid,
849 kdu_instance=kdu_instance,
850 namespace=instance_info["namespace"],
851 db_dict=db_dict,
852 operation="rollback",
853 )
854
855 if rc != 0:
856 msg = "Error executing command: {}\nOutput: {}".format(command, output)
857 self.log.error(msg)
858 raise K8sException(msg)
859
860 # sync fs
861 self.fs.reverse_sync(from_path=cluster_uuid)
862
863 # return new revision number
864 instance = await self.get_instance_info(
865 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
866 )
867 if instance:
868 revision = int(instance.get("revision"))
869 self.log.debug("New revision: {}".format(revision))
870 return revision
871 else:
872 return 0
873
874 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
875 """
876 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
877 (this call should happen after all _terminate-config-primitive_ of the VNF
878 are invoked).
879
880 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
881 :param kdu_instance: unique name for the KDU instance to be deleted
882 :param kwargs: Additional parameters (None yet)
883 :return: True if successful
884 """
885
886 self.log.debug(
887 "uninstall kdu_instance {} from cluster {}".format(
888 kdu_instance, cluster_uuid
889 )
890 )
891
892 # sync local dir
893 self.fs.sync(from_path=cluster_uuid)
894
895 # look for instance to obtain namespace
896 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
897 if not instance_info:
898 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
899 return True
900 # init env, paths
901 paths, env = self._init_paths_env(
902 cluster_name=cluster_uuid, create_if_not_exist=True
903 )
904
905 # sync local dir
906 self.fs.sync(from_path=cluster_uuid)
907
908 command = self._get_uninstall_command(
909 kdu_instance, instance_info["namespace"], paths["kube_config"]
910 )
911 output, _rc = await self._local_async_exec(
912 command=command, raise_exception_on_error=True, env=env
913 )
914
915 # sync fs
916 self.fs.reverse_sync(from_path=cluster_uuid)
917
918 return self._output_to_table(output)
919
920 async def instances_list(self, cluster_uuid: str) -> list:
921 """
922 returns a list of deployed releases in a cluster
923
924 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
925 :return:
926 """
927
928 self.log.debug("list releases for cluster {}".format(cluster_uuid))
929
930 # sync local dir
931 self.fs.sync(from_path=cluster_uuid)
932
933 # execute internal command
934 result = await self._instances_list(cluster_uuid)
935
936 # sync fs
937 self.fs.reverse_sync(from_path=cluster_uuid)
938
939 return result
940
941 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
942 instances = await self.instances_list(cluster_uuid=cluster_uuid)
943 for instance in instances:
944 if instance.get("name") == kdu_instance:
945 return instance
946 self.log.debug("Instance {} not found".format(kdu_instance))
947 return None
948
949 async def upgrade_charm(
950 self,
951 ee_id: str = None,
952 path: str = None,
953 charm_id: str = None,
954 charm_type: str = None,
955 timeout: float = None,
956 ) -> str:
957 """This method upgrade charms in VNFs
958
959 Args:
960 ee_id: Execution environment id
961 path: Local path to the charm
962 charm_id: charm-id
963 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
964 timeout: (Float) Timeout for the ns update operation
965
966 Returns:
967 The output of the update operation if status equals to "completed"
968 """
969 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
970
971 async def exec_primitive(
972 self,
973 cluster_uuid: str = None,
974 kdu_instance: str = None,
975 primitive_name: str = None,
976 timeout: float = 300,
977 params: dict = None,
978 db_dict: dict = None,
979 **kwargs,
980 ) -> str:
981 """Exec primitive (Juju action)
982
983 :param cluster_uuid: The UUID of the cluster or namespace:cluster
984 :param kdu_instance: The unique name of the KDU instance
985 :param primitive_name: Name of action that will be executed
986 :param timeout: Timeout for action execution
987 :param params: Dictionary of all the parameters needed for the action
988 :db_dict: Dictionary for any additional data
989 :param kwargs: Additional parameters (None yet)
990
991 :return: Returns the output of the action
992 """
993 raise K8sException(
994 "KDUs deployed with Helm don't support actions "
995 "different from rollback, upgrade and status"
996 )
997
998 async def get_services(
999 self, cluster_uuid: str, kdu_instance: str, namespace: str
1000 ) -> list:
1001 """
1002 Returns a list of services defined for the specified kdu instance.
1003
1004 :param cluster_uuid: UUID of a K8s cluster known by OSM
1005 :param kdu_instance: unique name for the KDU instance
1006 :param namespace: K8s namespace used by the KDU instance
1007 :return: If successful, it will return a list of services, Each service
1008 can have the following data:
1009 - `name` of the service
1010 - `type` type of service in the k8 cluster
1011 - `ports` List of ports offered by the service, for each port includes at least
1012 name, port, protocol
1013 - `cluster_ip` Internal ip to be used inside k8s cluster
1014 - `external_ip` List of external ips (in case they are available)
1015 """
1016
1017 self.log.debug(
1018 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1019 cluster_uuid, kdu_instance
1020 )
1021 )
1022
1023 # init env, paths
1024 paths, env = self._init_paths_env(
1025 cluster_name=cluster_uuid, create_if_not_exist=True
1026 )
1027
1028 # sync local dir
1029 self.fs.sync(from_path=cluster_uuid)
1030
1031 # get list of services names for kdu
1032 service_names = await self._get_services(
1033 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1034 )
1035
1036 service_list = []
1037 for service in service_names:
1038 service = await self._get_service(cluster_uuid, service, namespace)
1039 service_list.append(service)
1040
1041 # sync fs
1042 self.fs.reverse_sync(from_path=cluster_uuid)
1043
1044 return service_list
1045
1046 async def get_service(
1047 self, cluster_uuid: str, service_name: str, namespace: str
1048 ) -> object:
1049 self.log.debug(
1050 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1051 service_name, namespace, cluster_uuid
1052 )
1053 )
1054
1055 # sync local dir
1056 self.fs.sync(from_path=cluster_uuid)
1057
1058 service = await self._get_service(cluster_uuid, service_name, namespace)
1059
1060 # sync fs
1061 self.fs.reverse_sync(from_path=cluster_uuid)
1062
1063 return service
1064
1065 async def status_kdu(
1066 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1067 ) -> Union[str, dict]:
1068 """
1069 This call would retrieve tha current state of a given KDU instance. It would be
1070 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1071 values_ of the configuration parameters applied to a given instance. This call
1072 would be based on the `status` call.
1073
1074 :param cluster_uuid: UUID of a K8s cluster known by OSM
1075 :param kdu_instance: unique name for the KDU instance
1076 :param kwargs: Additional parameters (None yet)
1077 :param yaml_format: if the return shall be returned as an YAML string or as a
1078 dictionary
1079 :return: If successful, it will return the following vector of arguments:
1080 - K8s `namespace` in the cluster where the KDU lives
1081 - `state` of the KDU instance. It can be:
1082 - UNKNOWN
1083 - DEPLOYED
1084 - DELETED
1085 - SUPERSEDED
1086 - FAILED or
1087 - DELETING
1088 - List of `resources` (objects) that this release consists of, sorted by kind,
1089 and the status of those resources
1090 - Last `deployment_time`.
1091
1092 """
1093 self.log.debug(
1094 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1095 cluster_uuid, kdu_instance
1096 )
1097 )
1098
1099 # sync local dir
1100 self.fs.sync(from_path=cluster_uuid)
1101
1102 # get instance: needed to obtain namespace
1103 instances = await self._instances_list(cluster_id=cluster_uuid)
1104 for instance in instances:
1105 if instance.get("name") == kdu_instance:
1106 break
1107 else:
1108 # instance does not exist
1109 raise K8sException(
1110 "Instance name: {} not found in cluster: {}".format(
1111 kdu_instance, cluster_uuid
1112 )
1113 )
1114
1115 status = await self._status_kdu(
1116 cluster_id=cluster_uuid,
1117 kdu_instance=kdu_instance,
1118 namespace=instance["namespace"],
1119 yaml_format=yaml_format,
1120 show_error_log=True,
1121 )
1122
1123 # sync fs
1124 self.fs.reverse_sync(from_path=cluster_uuid)
1125
1126 return status
1127
1128 async def get_values_kdu(
1129 self, kdu_instance: str, namespace: str, kubeconfig: str
1130 ) -> str:
1131 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1132
1133 return await self._exec_get_command(
1134 get_command="values",
1135 kdu_instance=kdu_instance,
1136 namespace=namespace,
1137 kubeconfig=kubeconfig,
1138 )
1139
1140 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1141 """Method to obtain the Helm Chart package's values
1142
1143 Args:
1144 kdu_model: The name or path of an Helm Chart
1145 repo_url: Helm Chart repository url
1146
1147 Returns:
1148 str: the values of the Helm Chart package
1149 """
1150
1151 self.log.debug(
1152 "inspect kdu_model values {} from (optional) repo: {}".format(
1153 kdu_model, repo_url
1154 )
1155 )
1156
1157 return await self._exec_inspect_command(
1158 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1159 )
1160
1161 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1162 self.log.debug(
1163 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1164 )
1165
1166 return await self._exec_inspect_command(
1167 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1168 )
1169
1170 async def synchronize_repos(self, cluster_uuid: str):
1171 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1172 try:
1173 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1174 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1175
1176 local_repo_list = await self.repo_list(cluster_uuid)
1177 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1178
1179 deleted_repo_list = []
1180 added_repo_dict = {}
1181
1182 # iterate over the list of repos in the database that should be
1183 # added if not present
1184 for repo_name, db_repo in db_repo_dict.items():
1185 try:
1186 # check if it is already present
1187 curr_repo_url = local_repo_dict.get(db_repo["name"])
1188 repo_id = db_repo.get("_id")
1189 if curr_repo_url != db_repo["url"]:
1190 if curr_repo_url:
1191 self.log.debug(
1192 "repo {} url changed, delete and and again".format(
1193 db_repo["url"]
1194 )
1195 )
1196 await self.repo_remove(cluster_uuid, db_repo["name"])
1197 deleted_repo_list.append(repo_id)
1198
1199 # add repo
1200 self.log.debug("add repo {}".format(db_repo["name"]))
1201 if "ca_cert" in db_repo:
1202 await self.repo_add(
1203 cluster_uuid,
1204 db_repo["name"],
1205 db_repo["url"],
1206 cert=db_repo["ca_cert"],
1207 )
1208 else:
1209 await self.repo_add(
1210 cluster_uuid,
1211 db_repo["name"],
1212 db_repo["url"],
1213 )
1214 added_repo_dict[repo_id] = db_repo["name"]
1215 except Exception as e:
1216 raise K8sException(
1217 "Error adding repo id: {}, err_msg: {} ".format(
1218 repo_id, repr(e)
1219 )
1220 )
1221
1222 # Delete repos that are present but not in nbi_list
1223 for repo_name in local_repo_dict:
1224 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1225 self.log.debug("delete repo {}".format(repo_name))
1226 try:
1227 await self.repo_remove(cluster_uuid, repo_name)
1228 deleted_repo_list.append(repo_name)
1229 except Exception as e:
1230 self.warning(
1231 "Error deleting repo, name: {}, err_msg: {}".format(
1232 repo_name, str(e)
1233 )
1234 )
1235
1236 return deleted_repo_list, added_repo_dict
1237
1238 except K8sException:
1239 raise
1240 except Exception as e:
1241 # Do not raise errors synchronizing repos
1242 self.log.error("Error synchronizing repos: {}".format(e))
1243 raise Exception("Error synchronizing repos: {}".format(e))
1244
1245 def _get_db_repos_dict(self, repo_ids: list):
1246 db_repos_dict = {}
1247 for repo_id in repo_ids:
1248 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1249 db_repos_dict[db_repo["name"]] = db_repo
1250 return db_repos_dict
1251
1252 """
1253 ####################################################################################
1254 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1255 ####################################################################################
1256 """
1257
1258 @abc.abstractmethod
1259 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1260 """
1261 Creates and returns base cluster and kube dirs and returns them.
1262 Also created helm3 dirs according to new directory specification, paths are
1263 not returned but assigned to helm environment variables
1264
1265 :param cluster_name: cluster_name
1266 :return: Dictionary with config_paths and dictionary with helm environment variables
1267 """
1268
1269 @abc.abstractmethod
1270 async def _cluster_init(self, cluster_id, namespace, paths, env):
1271 """
1272 Implements the helm version dependent cluster initialization
1273 """
1274
1275 @abc.abstractmethod
1276 async def _instances_list(self, cluster_id):
1277 """
1278 Implements the helm version dependent helm instances list
1279 """
1280
1281 @abc.abstractmethod
1282 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1283 """
1284 Implements the helm version dependent method to obtain services from a helm instance
1285 """
1286
1287 @abc.abstractmethod
1288 async def _status_kdu(
1289 self,
1290 cluster_id: str,
1291 kdu_instance: str,
1292 namespace: str = None,
1293 yaml_format: bool = False,
1294 show_error_log: bool = False,
1295 ) -> Union[str, dict]:
1296 """
1297 Implements the helm version dependent method to obtain status of a helm instance
1298 """
1299
1300 @abc.abstractmethod
1301 def _get_install_command(
1302 self,
1303 kdu_model,
1304 kdu_instance,
1305 namespace,
1306 params_str,
1307 version,
1308 atomic,
1309 timeout,
1310 kubeconfig,
1311 ) -> str:
1312 """
1313 Obtain command to be executed to delete the indicated instance
1314 """
1315
1316 @abc.abstractmethod
1317 def _get_upgrade_scale_command(
1318 self,
1319 kdu_model,
1320 kdu_instance,
1321 namespace,
1322 count,
1323 version,
1324 atomic,
1325 replicas,
1326 timeout,
1327 resource_name,
1328 kubeconfig,
1329 ) -> str:
1330 """Generates the command to scale a Helm Chart release
1331
1332 Args:
1333 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1334 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1335 namespace (str): Namespace where this KDU instance is deployed
1336 scale (int): Scale count
1337 version (str): Constraint with specific version of the Chart to use
1338 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1339 The --wait flag will be set automatically if --atomic is used
1340 replica_str (str): The key under resource_name key where the scale count is stored
1341 timeout (float): The time, in seconds, to wait
1342 resource_name (str): The KDU's resource to scale
1343 kubeconfig (str): Kubeconfig file path
1344
1345 Returns:
1346 str: command to scale a Helm Chart release
1347 """
1348
1349 @abc.abstractmethod
1350 def _get_upgrade_command(
1351 self,
1352 kdu_model,
1353 kdu_instance,
1354 namespace,
1355 params_str,
1356 version,
1357 atomic,
1358 timeout,
1359 kubeconfig,
1360 force,
1361 ) -> str:
1362 """Generates the command to upgrade a Helm Chart release
1363
1364 Args:
1365 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1366 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1367 namespace (str): Namespace where this KDU instance is deployed
1368 params_str (str): Params used to upgrade the Helm Chart release
1369 version (str): Constraint with specific version of the Chart to use
1370 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1371 The --wait flag will be set automatically if --atomic is used
1372 timeout (float): The time, in seconds, to wait
1373 kubeconfig (str): Kubeconfig file path
1374 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1375 Returns:
1376 str: command to upgrade a Helm Chart release
1377 """
1378
1379 @abc.abstractmethod
1380 def _get_rollback_command(
1381 self, kdu_instance, namespace, revision, kubeconfig
1382 ) -> str:
1383 """
1384 Obtain command to be executed to rollback the indicated instance
1385 """
1386
1387 @abc.abstractmethod
1388 def _get_uninstall_command(
1389 self, kdu_instance: str, namespace: str, kubeconfig: str
1390 ) -> str:
1391 """
1392 Obtain command to be executed to delete the indicated instance
1393 """
1394
1395 @abc.abstractmethod
1396 def _get_inspect_command(
1397 self, show_command: str, kdu_model: str, repo_str: str, version: str
1398 ):
1399 """Generates the command to obtain the information about an Helm Chart package
1400 (´helm show ...´ command)
1401
1402 Args:
1403 show_command: the second part of the command (`helm show <show_command>`)
1404 kdu_model: The name or path of an Helm Chart
1405 repo_url: Helm Chart repository url
1406 version: constraint with specific version of the Chart to use
1407
1408 Returns:
1409 str: the generated Helm Chart command
1410 """
1411
1412 @abc.abstractmethod
1413 def _get_get_command(
1414 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1415 ):
1416 """Obtain command to be executed to get information about the kdu instance."""
1417
1418 @abc.abstractmethod
1419 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1420 """
1421 Method call to uninstall cluster software for helm. This method is dependent
1422 of helm version
1423 For Helm v2 it will be called when Tiller must be uninstalled
1424 For Helm v3 it does nothing and does not need to be callled
1425 """
1426
1427 @abc.abstractmethod
1428 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1429 """
1430 Obtains the cluster repos identifiers
1431 """
1432
1433 """
1434 ####################################################################################
1435 ################################### P R I V A T E ##################################
1436 ####################################################################################
1437 """
1438
1439 @staticmethod
1440 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1441 if os.path.exists(filename):
1442 return True
1443 else:
1444 msg = "File {} does not exist".format(filename)
1445 if exception_if_not_exists:
1446 raise K8sException(msg)
1447
1448 @staticmethod
1449 def _remove_multiple_spaces(strobj):
1450 strobj = strobj.strip()
1451 while " " in strobj:
1452 strobj = strobj.replace(" ", " ")
1453 return strobj
1454
1455 @staticmethod
1456 def _output_to_lines(output: str) -> list:
1457 output_lines = list()
1458 lines = output.splitlines(keepends=False)
1459 for line in lines:
1460 line = line.strip()
1461 if len(line) > 0:
1462 output_lines.append(line)
1463 return output_lines
1464
1465 @staticmethod
1466 def _output_to_table(output: str) -> list:
1467 output_table = list()
1468 lines = output.splitlines(keepends=False)
1469 for line in lines:
1470 line = line.replace("\t", " ")
1471 line_list = list()
1472 output_table.append(line_list)
1473 cells = line.split(sep=" ")
1474 for cell in cells:
1475 cell = cell.strip()
1476 if len(cell) > 0:
1477 line_list.append(cell)
1478 return output_table
1479
1480 @staticmethod
1481 def _parse_services(output: str) -> list:
1482 lines = output.splitlines(keepends=False)
1483 services = []
1484 for line in lines:
1485 line = line.replace("\t", " ")
1486 cells = line.split(sep=" ")
1487 if len(cells) > 0 and cells[0].startswith("service/"):
1488 elems = cells[0].split(sep="/")
1489 if len(elems) > 1:
1490 services.append(elems[1])
1491 return services
1492
1493 @staticmethod
1494 def _get_deep(dictionary: dict, members: tuple):
1495 target = dictionary
1496 value = None
1497 try:
1498 for m in members:
1499 value = target.get(m)
1500 if not value:
1501 return None
1502 else:
1503 target = value
1504 except Exception:
1505 pass
1506 return value
1507
1508 # find key:value in several lines
1509 @staticmethod
1510 def _find_in_lines(p_lines: list, p_key: str) -> str:
1511 for line in p_lines:
1512 try:
1513 if line.startswith(p_key + ":"):
1514 parts = line.split(":")
1515 the_value = parts[1].strip()
1516 return the_value
1517 except Exception:
1518 # ignore it
1519 pass
1520 return None
1521
1522 @staticmethod
1523 def _lower_keys_list(input_list: list):
1524 """
1525 Transform the keys in a list of dictionaries to lower case and returns a new list
1526 of dictionaries
1527 """
1528 new_list = []
1529 if input_list:
1530 for dictionary in input_list:
1531 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1532 new_list.append(new_dict)
1533 return new_list
1534
1535 async def _local_async_exec(
1536 self,
1537 command: str,
1538 raise_exception_on_error: bool = False,
1539 show_error_log: bool = True,
1540 encode_utf8: bool = False,
1541 env: dict = None,
1542 ) -> tuple[str, int]:
1543 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1544 self.log.debug(
1545 "Executing async local command: {}, env: {}".format(command, env)
1546 )
1547
1548 # split command
1549 command = shlex.split(command)
1550
1551 environ = os.environ.copy()
1552 if env:
1553 environ.update(env)
1554
1555 try:
1556 async with self.cmd_lock:
1557 process = await asyncio.create_subprocess_exec(
1558 *command,
1559 stdout=asyncio.subprocess.PIPE,
1560 stderr=asyncio.subprocess.PIPE,
1561 env=environ,
1562 )
1563
1564 # wait for command terminate
1565 stdout, stderr = await process.communicate()
1566
1567 return_code = process.returncode
1568
1569 output = ""
1570 if stdout:
1571 output = stdout.decode("utf-8").strip()
1572 # output = stdout.decode()
1573 if stderr:
1574 output = stderr.decode("utf-8").strip()
1575 # output = stderr.decode()
1576
1577 if return_code != 0 and show_error_log:
1578 self.log.debug(
1579 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1580 )
1581 else:
1582 self.log.debug("Return code: {}".format(return_code))
1583
1584 if raise_exception_on_error and return_code != 0:
1585 raise K8sException(output)
1586
1587 if encode_utf8:
1588 output = output.encode("utf-8").strip()
1589 output = str(output).replace("\\n", "\n")
1590
1591 return output, return_code
1592
1593 except asyncio.CancelledError:
1594 # first, kill the process if it is still running
1595 if process.returncode is None:
1596 process.kill()
1597 raise
1598 except K8sException:
1599 raise
1600 except Exception as e:
1601 msg = "Exception executing command: {} -> {}".format(command, e)
1602 self.log.error(msg)
1603 if raise_exception_on_error:
1604 raise K8sException(e) from e
1605 else:
1606 return "", -1
1607
1608 async def _local_async_exec_pipe(
1609 self,
1610 command1: str,
1611 command2: str,
1612 raise_exception_on_error: bool = True,
1613 show_error_log: bool = True,
1614 encode_utf8: bool = False,
1615 env: dict = None,
1616 ):
1617 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1618 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1619 command = "{} | {}".format(command1, command2)
1620 self.log.debug(
1621 "Executing async local command: {}, env: {}".format(command, env)
1622 )
1623
1624 # split command
1625 command1 = shlex.split(command1)
1626 command2 = shlex.split(command2)
1627
1628 environ = os.environ.copy()
1629 if env:
1630 environ.update(env)
1631
1632 try:
1633 async with self.cmd_lock:
1634 read, write = os.pipe()
1635 process_1 = await asyncio.create_subprocess_exec(
1636 *command1, stdout=write, env=environ
1637 )
1638 os.close(write)
1639 process_2 = await asyncio.create_subprocess_exec(
1640 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1641 )
1642 os.close(read)
1643 stdout, stderr = await process_2.communicate()
1644
1645 return_code = process_2.returncode
1646
1647 output = ""
1648 if stdout:
1649 output = stdout.decode("utf-8").strip()
1650 # output = stdout.decode()
1651 if stderr:
1652 output = stderr.decode("utf-8").strip()
1653 # output = stderr.decode()
1654
1655 if return_code != 0 and show_error_log:
1656 self.log.debug(
1657 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1658 )
1659 else:
1660 self.log.debug("Return code: {}".format(return_code))
1661
1662 if raise_exception_on_error and return_code != 0:
1663 raise K8sException(output)
1664
1665 if encode_utf8:
1666 output = output.encode("utf-8").strip()
1667 output = str(output).replace("\\n", "\n")
1668
1669 return output, return_code
1670 except asyncio.CancelledError:
1671 # first, kill the processes if they are still running
1672 for process in (process_1, process_2):
1673 if process.returncode is None:
1674 process.kill()
1675 raise
1676 except K8sException:
1677 raise
1678 except Exception as e:
1679 msg = "Exception executing command: {} -> {}".format(command, e)
1680 self.log.error(msg)
1681 if raise_exception_on_error:
1682 raise K8sException(e) from e
1683 else:
1684 return "", -1
1685
1686 async def _get_service(self, cluster_id, service_name, namespace):
1687 """
1688 Obtains the data of the specified service in the k8cluster.
1689
1690 :param cluster_id: id of a K8s cluster known by OSM
1691 :param service_name: name of the K8s service in the specified namespace
1692 :param namespace: K8s namespace used by the KDU instance
1693 :return: If successful, it will return a service with the following data:
1694 - `name` of the service
1695 - `type` type of service in the k8 cluster
1696 - `ports` List of ports offered by the service, for each port includes at least
1697 name, port, protocol
1698 - `cluster_ip` Internal ip to be used inside k8s cluster
1699 - `external_ip` List of external ips (in case they are available)
1700 """
1701
1702 # init config, env
1703 paths, env = self._init_paths_env(
1704 cluster_name=cluster_id, create_if_not_exist=True
1705 )
1706
1707 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1708 self.kubectl_command,
1709 paths["kube_config"],
1710 quote(namespace),
1711 quote(service_name),
1712 )
1713
1714 output, _rc = await self._local_async_exec(
1715 command=command, raise_exception_on_error=True, env=env
1716 )
1717
1718 data = yaml.load(output, Loader=yaml.SafeLoader)
1719
1720 service = {
1721 "name": service_name,
1722 "type": self._get_deep(data, ("spec", "type")),
1723 "ports": self._get_deep(data, ("spec", "ports")),
1724 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1725 }
1726 if service["type"] == "LoadBalancer":
1727 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1728 ip_list = [elem["ip"] for elem in ip_map_list]
1729 service["external_ip"] = ip_list
1730
1731 return service
1732
1733 async def _exec_get_command(
1734 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1735 ):
1736 """Obtains information about the kdu instance."""
1737
1738 full_command = self._get_get_command(
1739 get_command, kdu_instance, namespace, kubeconfig
1740 )
1741
1742 output, _rc = await self._local_async_exec(command=full_command)
1743
1744 return output
1745
1746 async def _exec_inspect_command(
1747 self, inspect_command: str, kdu_model: str, repo_url: str = None
1748 ):
1749 """Obtains information about an Helm Chart package (´helm show´ command)
1750
1751 Args:
1752 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1753 kdu_model: The name or path of an Helm Chart
1754 repo_url: Helm Chart repository url
1755
1756 Returns:
1757 str: the requested info about the Helm Chart package
1758 """
1759
1760 repo_str = ""
1761 if repo_url:
1762 repo_str = " --repo {}".format(quote(repo_url))
1763
1764 # Obtain the Chart's name and store it in the var kdu_model
1765 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1766
1767 kdu_model, version = self._split_version(kdu_model)
1768 if version:
1769 version_str = "--version {}".format(quote(version))
1770 else:
1771 version_str = ""
1772
1773 full_command = self._get_inspect_command(
1774 show_command=inspect_command,
1775 kdu_model=quote(kdu_model),
1776 repo_str=repo_str,
1777 version=version_str,
1778 )
1779
1780 output, _ = await self._local_async_exec(command=full_command)
1781
1782 return output
1783
1784 async def _get_replica_count_url(
1785 self,
1786 kdu_model: str,
1787 repo_url: str = None,
1788 resource_name: str = None,
1789 ) -> tuple[int, str]:
1790 """Get the replica count value in the Helm Chart Values.
1791
1792 Args:
1793 kdu_model: The name or path of an Helm Chart
1794 repo_url: Helm Chart repository url
1795 resource_name: Resource name
1796
1797 Returns:
1798 A tuple with:
1799 - The number of replicas of the specific instance; if not found, returns None; and
1800 - The string corresponding to the replica count key in the Helm values
1801 """
1802
1803 kdu_values = yaml.load(
1804 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1805 Loader=yaml.SafeLoader,
1806 )
1807
1808 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1809
1810 if not kdu_values:
1811 raise K8sException(
1812 "kdu_values not found for kdu_model {}".format(kdu_model)
1813 )
1814
1815 if resource_name:
1816 kdu_values = kdu_values.get(resource_name, None)
1817
1818 if not kdu_values:
1819 msg = "resource {} not found in the values in model {}".format(
1820 resource_name, kdu_model
1821 )
1822 self.log.error(msg)
1823 raise K8sException(msg)
1824
1825 duplicate_check = False
1826
1827 replica_str = ""
1828 replicas = None
1829
1830 if kdu_values.get("replicaCount") is not None:
1831 replicas = kdu_values["replicaCount"]
1832 replica_str = "replicaCount"
1833 elif kdu_values.get("replicas") is not None:
1834 duplicate_check = True
1835 replicas = kdu_values["replicas"]
1836 replica_str = "replicas"
1837 else:
1838 if resource_name:
1839 msg = (
1840 "replicaCount or replicas not found in the resource"
1841 "{} values in model {}. Cannot be scaled".format(
1842 resource_name, kdu_model
1843 )
1844 )
1845 else:
1846 msg = (
1847 "replicaCount or replicas not found in the values"
1848 "in model {}. Cannot be scaled".format(kdu_model)
1849 )
1850 self.log.error(msg)
1851 raise K8sException(msg)
1852
1853 # Control if replicas and replicaCount exists at the same time
1854 msg = "replicaCount and replicas are exists at the same time"
1855 if duplicate_check:
1856 if "replicaCount" in kdu_values:
1857 self.log.error(msg)
1858 raise K8sException(msg)
1859 else:
1860 if "replicas" in kdu_values:
1861 self.log.error(msg)
1862 raise K8sException(msg)
1863
1864 return replicas, replica_str
1865
1866 async def _get_replica_count_instance(
1867 self,
1868 kdu_instance: str,
1869 namespace: str,
1870 kubeconfig: str,
1871 resource_name: str = None,
1872 ) -> int:
1873 """Get the replica count value in the instance.
1874
1875 Args:
1876 kdu_instance: The name of the KDU instance
1877 namespace: KDU instance namespace
1878 kubeconfig:
1879 resource_name: Resource name
1880
1881 Returns:
1882 The number of replicas of the specific instance; if not found, returns None
1883 """
1884
1885 kdu_values = yaml.load(
1886 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1887 Loader=yaml.SafeLoader,
1888 )
1889
1890 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1891
1892 replicas = None
1893
1894 if kdu_values:
1895 resource_values = (
1896 kdu_values.get(resource_name, None) if resource_name else None
1897 )
1898
1899 for replica_str in ("replicaCount", "replicas"):
1900 if resource_values:
1901 replicas = resource_values.get(replica_str)
1902 else:
1903 replicas = kdu_values.get(replica_str)
1904
1905 if replicas is not None:
1906 break
1907
1908 return replicas
1909
1910 async def _store_status(
1911 self,
1912 cluster_id: str,
1913 operation: str,
1914 kdu_instance: str,
1915 namespace: str = None,
1916 db_dict: dict = None,
1917 ) -> None:
1918 """
1919 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1920
1921 :param cluster_id (str): the cluster where the KDU instance is deployed
1922 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1923 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1924 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1925 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1926 values for the keys:
1927 - "collection": The Mongo DB collection to write to
1928 - "filter": The query filter to use in the update process
1929 - "path": The dot separated keys which targets the object to be updated
1930 Defaults to None.
1931 """
1932
1933 try:
1934 detailed_status = await self._status_kdu(
1935 cluster_id=cluster_id,
1936 kdu_instance=kdu_instance,
1937 yaml_format=False,
1938 namespace=namespace,
1939 )
1940
1941 status = detailed_status.get("info").get("description")
1942 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1943
1944 # write status to db
1945 result = await self.write_app_status_to_db(
1946 db_dict=db_dict,
1947 status=str(status),
1948 detailed_status=str(detailed_status),
1949 operation=operation,
1950 )
1951
1952 if not result:
1953 self.log.info("Error writing in database. Task exiting...")
1954
1955 except asyncio.CancelledError as e:
1956 self.log.warning(
1957 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1958 )
1959 except Exception as e:
1960 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1961
1962 # params for use in -f file
1963 # returns values file option and filename (in order to delete it at the end)
1964 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
1965 if params and len(params) > 0:
1966 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1967
1968 def get_random_number():
1969 r = random.SystemRandom().randint(1, 99999999)
1970 s = str(r)
1971 while len(s) < 10:
1972 s = "0" + s
1973 return s
1974
1975 params2 = dict()
1976 for key in params:
1977 value = params.get(key)
1978 if "!!yaml" in str(value):
1979 value = yaml.safe_load(value[7:])
1980 params2[key] = value
1981
1982 values_file = get_random_number() + ".yaml"
1983 with open(values_file, "w") as stream:
1984 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1985
1986 return "-f {}".format(values_file), values_file
1987
1988 return "", None
1989
1990 # params for use in --set option
1991 @staticmethod
1992 def _params_to_set_option(params: dict) -> str:
1993 pairs = [
1994 f"{quote(str(key))}={quote(str(value))}"
1995 for key, value in params.items()
1996 if value is not None
1997 ]
1998 if not pairs:
1999 return ""
2000 return "--set " + ",".join(pairs)
2001
2002 @staticmethod
2003 def generate_kdu_instance_name(**kwargs):
2004 chart_name = kwargs["kdu_model"]
2005 # check embeded chart (file or dir)
2006 if chart_name.startswith("/"):
2007 # extract file or directory name
2008 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2009 # check URL
2010 elif "://" in chart_name:
2011 # extract last portion of URL
2012 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2013
2014 name = ""
2015 for c in chart_name:
2016 if c.isalpha() or c.isnumeric():
2017 name += c
2018 else:
2019 name += "-"
2020 if len(name) > 35:
2021 name = name[0:35]
2022
2023 # if does not start with alpha character, prefix 'a'
2024 if not name[0].isalpha():
2025 name = "a" + name
2026
2027 name += "-"
2028
2029 def get_random_number():
2030 r = random.SystemRandom().randint(1, 99999999)
2031 s = str(r)
2032 s = s.rjust(10, "0")
2033 return s
2034
2035 name = name + get_random_number()
2036 return name.lower()
2037
2038 def _split_version(self, kdu_model: str) -> tuple[str, str]:
2039 version = None
2040 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2041 parts = kdu_model.split(sep=":")
2042 if len(parts) == 2:
2043 version = str(parts[1])
2044 kdu_model = parts[0]
2045 return kdu_model, version
2046
2047 def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2048 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2049
2050 Args:
2051 kdu_model (str): Associated KDU model
2052
2053 Returns:
2054 (str, str): Tuple with the Chart name in index 0, and the repo name
2055 in index 2; if there was a problem finding them, return None
2056 for both
2057 """
2058
2059 chart_name = None
2060 repo_name = None
2061
2062 idx = kdu_model.find("/")
2063 if idx >= 0:
2064 chart_name = kdu_model[idx + 1 :]
2065 repo_name = kdu_model[:idx]
2066
2067 return chart_name, repo_name
2068
2069 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2070 """Obtain the Helm repository for an Helm Chart
2071
2072 Args:
2073 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2074 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2075
2076 Returns:
2077 str: the repository URL; if Helm Chart is a local one, the function returns None
2078 """
2079
2080 _, repo_name = self._split_repo(kdu_model=kdu_model)
2081
2082 repo_url = None
2083 if repo_name:
2084 # Find repository link
2085 local_repo_list = await self.repo_list(cluster_uuid)
2086 for repo in local_repo_list:
2087 if repo["name"] == repo_name:
2088 repo_url = repo["url"]
2089 break # it is not necessary to continue the loop if the repo link was found...
2090
2091 return repo_url
2092
2093 async def create_certificate(
2094 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2095 ):
2096 paths, env = self._init_paths_env(
2097 cluster_name=cluster_uuid, create_if_not_exist=True
2098 )
2099 kubectl = Kubectl(config_file=paths["kube_config"])
2100 await kubectl.create_certificate(
2101 namespace=namespace,
2102 name=name,
2103 dns_prefix=dns_prefix,
2104 secret_name=secret_name,
2105 usages=[usage],
2106 issuer_name="ca-issuer",
2107 )
2108
2109 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2110 paths, env = self._init_paths_env(
2111 cluster_name=cluster_uuid, create_if_not_exist=True
2112 )
2113 kubectl = Kubectl(config_file=paths["kube_config"])
2114 await kubectl.delete_certificate(namespace, certificate_name)
2115
2116 async def create_namespace(
2117 self,
2118 namespace,
2119 cluster_uuid,
2120 labels,
2121 ):
2122 """
2123 Create a namespace in a specific cluster
2124
2125 :param namespace: Namespace to be created
2126 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2127 :param labels: Dictionary with labels for the new namespace
2128 :returns: None
2129 """
2130 paths, env = self._init_paths_env(
2131 cluster_name=cluster_uuid, create_if_not_exist=True
2132 )
2133 kubectl = Kubectl(config_file=paths["kube_config"])
2134 await kubectl.create_namespace(
2135 name=namespace,
2136 labels=labels,
2137 )
2138
2139 async def delete_namespace(
2140 self,
2141 namespace,
2142 cluster_uuid,
2143 ):
2144 """
2145 Delete a namespace in a specific cluster
2146
2147 :param namespace: namespace to be deleted
2148 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2149 :returns: None
2150 """
2151 paths, env = self._init_paths_env(
2152 cluster_name=cluster_uuid, create_if_not_exist=True
2153 )
2154 kubectl = Kubectl(config_file=paths["kube_config"])
2155 await kubectl.delete_namespace(
2156 name=namespace,
2157 )
2158
2159 async def copy_secret_data(
2160 self,
2161 src_secret: str,
2162 dst_secret: str,
2163 cluster_uuid: str,
2164 data_key: str,
2165 src_namespace: str = "osm",
2166 dst_namespace: str = "osm",
2167 ):
2168 """
2169 Copy a single key and value from an existing secret to a new one
2170
2171 :param src_secret: name of the existing secret
2172 :param dst_secret: name of the new secret
2173 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2174 :param data_key: key of the existing secret to be copied
2175 :param src_namespace: Namespace of the existing secret
2176 :param dst_namespace: Namespace of the new secret
2177 :returns: None
2178 """
2179 paths, env = self._init_paths_env(
2180 cluster_name=cluster_uuid, create_if_not_exist=True
2181 )
2182 kubectl = Kubectl(config_file=paths["kube_config"])
2183 secret_data = await kubectl.get_secret_content(
2184 name=src_secret,
2185 namespace=src_namespace,
2186 )
2187 # Only the corresponding data_key value needs to be copy
2188 data = {data_key: secret_data.get(data_key)}
2189 await kubectl.create_secret(
2190 name=dst_secret,
2191 data=data,
2192 namespace=dst_namespace,
2193 secret_type="Opaque",
2194 )
2195
2196 async def setup_default_rbac(
2197 self,
2198 name,
2199 namespace,
2200 cluster_uuid,
2201 api_groups,
2202 resources,
2203 verbs,
2204 service_account,
2205 ):
2206 """
2207 Create a basic RBAC for a new namespace.
2208
2209 :param name: name of both Role and Role Binding
2210 :param namespace: K8s namespace
2211 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2212 :param api_groups: Api groups to be allowed in Policy Rule
2213 :param resources: Resources to be allowed in Policy Rule
2214 :param verbs: Verbs to be allowed in Policy Rule
2215 :param service_account: Service Account name used to bind the Role
2216 :returns: None
2217 """
2218 paths, env = self._init_paths_env(
2219 cluster_name=cluster_uuid, create_if_not_exist=True
2220 )
2221 kubectl = Kubectl(config_file=paths["kube_config"])
2222 await kubectl.create_role(
2223 name=name,
2224 labels={},
2225 namespace=namespace,
2226 api_groups=api_groups,
2227 resources=resources,
2228 verbs=verbs,
2229 )
2230 await kubectl.create_role_binding(
2231 name=name,
2232 labels={},
2233 namespace=namespace,
2234 role_name=name,
2235 sa_name=service_account,
2236 )