abf2d7e582391417df279403ad8a01d81cb1640f
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 from shlex import quote
26 import random
27 import time
28 import shlex
29 import shutil
30 import stat
31 import os
32 import yaml
33 from uuid import uuid4
34 from urllib.parse import urlparse
35
36 from n2vc.config import EnvironConfig
37 from n2vc.exceptions import K8sException
38 from n2vc.k8s_conn import K8sConnector
39 from n2vc.kubectl import Kubectl
40
41
42 class K8sHelmBaseConnector(K8sConnector):
43
44 """
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
48 """
49
50 service_account = "osm"
51
52 def __init__(
53 self,
54 fs: object,
55 db: object,
56 kubectl_command: str = "/usr/bin/kubectl",
57 helm_command: str = "/usr/bin/helm",
58 log: object = None,
59 on_update_db=None,
60 ):
61 """
62
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
67 :param log: logger
68 :param on_update_db: callback called when k8s connector updates database
69 """
70
71 # parent class
72 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
73
74 self.log.info("Initializing K8S Helm connector")
75
76 self.config = EnvironConfig()
77 # random numbers for release name generation
78 random.seed(time.time())
79
80 # the file system
81 self.fs = fs
82
83 # exception if kubectl is not installed
84 self.kubectl_command = kubectl_command
85 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
86
87 # exception if helm is not installed
88 self._helm_command = helm_command
89 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
90
91 # obtain stable repo url from config or apply default
92 self._stable_repo_url = self.config.get("stablerepourl")
93 if self._stable_repo_url == "None":
94 self._stable_repo_url = None
95
96 # Lock to avoid concurrent execution of helm commands
97 self.cmd_lock = asyncio.Lock()
98
99 def _get_namespace(self, cluster_uuid: str) -> str:
100 """
101 Obtains the namespace used by the cluster with the uuid passed by argument
102
103 param: cluster_uuid: cluster's uuid
104 """
105
106 # first, obtain the cluster corresponding to the uuid passed by argument
107 k8scluster = self.db.get_one(
108 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
109 )
110 return k8scluster.get("namespace")
111
112 async def init_env(
113 self,
114 k8s_creds: str,
115 namespace: str = "kube-system",
116 reuse_cluster_uuid=None,
117 **kwargs,
118 ) -> tuple[str, bool]:
119 """
120 It prepares a given K8s cluster environment to run Charts
121
122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
123 '.kube/config'
124 :param namespace: optional namespace to be used for helm. By default,
125 'kube-system' will be used
126 :param reuse_cluster_uuid: existing cluster uuid for reuse
127 :param kwargs: Additional parameters (None yet)
128 :return: uuid of the K8s cluster and True if connector has installed some
129 software in the cluster
130 (on error, an exception will be raised)
131 """
132
133 if reuse_cluster_uuid:
134 cluster_id = reuse_cluster_uuid
135 else:
136 cluster_id = str(uuid4())
137
138 self.log.debug(
139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
140 )
141
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_id, create_if_not_exist=True
144 )
145 mode = stat.S_IRUSR | stat.S_IWUSR
146 with open(paths["kube_config"], "w", mode) as f:
147 f.write(k8s_creds)
148 os.chmod(paths["kube_config"], 0o600)
149
150 # Code with initialization specific of helm version
151 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
152
153 # sync fs with local data
154 self.fs.reverse_sync(from_path=cluster_id)
155
156 self.log.info("Cluster {} initialized".format(cluster_id))
157
158 return cluster_id, n2vc_installed_sw
159
160 async def repo_add(
161 self,
162 cluster_uuid: str,
163 name: str,
164 url: str,
165 repo_type: str = "chart",
166 cert: str = None,
167 user: str = None,
168 password: str = None,
169 oci: bool = False,
170 ):
171 self.log.debug(
172 "Cluster {}, adding {} repository {}. URL: {}".format(
173 cluster_uuid, repo_type, name, url
174 )
175 )
176
177 # init_env
178 paths, env = self._init_paths_env(
179 cluster_name=cluster_uuid, create_if_not_exist=True
180 )
181
182 # sync local dir
183 self.fs.sync(from_path=cluster_uuid)
184
185 if oci:
186 if user and password:
187 host_port = urlparse(url).netloc if url.startswith("oci://") else url
188 # helm registry login url
189 command = "env KUBECONFIG={} {} registry login {}".format(
190 paths["kube_config"], self._helm_command, quote(host_port)
191 )
192 else:
193 self.log.debug(
194 "OCI registry login is not needed for repo: {}".format(name)
195 )
196 return
197 else:
198 # helm repo add name url
199 command = "env KUBECONFIG={} {} repo add {} {}".format(
200 paths["kube_config"], self._helm_command, quote(name), quote(url)
201 )
202
203 if cert:
204 temp_cert_file = os.path.join(
205 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
206 )
207 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
208 with open(temp_cert_file, "w") as the_cert:
209 the_cert.write(cert)
210 command += " --ca-file {}".format(quote(temp_cert_file))
211
212 if user:
213 command += " --username={}".format(quote(user))
214
215 if password:
216 command += " --password={}".format(quote(password))
217
218 self.log.debug("adding repo: {}".format(command))
219 await self._local_async_exec(
220 command=command, raise_exception_on_error=True, env=env
221 )
222
223 if not oci:
224 # helm repo update
225 command = "env KUBECONFIG={} {} repo update {}".format(
226 paths["kube_config"], self._helm_command, quote(name)
227 )
228 self.log.debug("updating repo: {}".format(command))
229 await self._local_async_exec(
230 command=command, raise_exception_on_error=False, env=env
231 )
232
233 # sync fs
234 self.fs.reverse_sync(from_path=cluster_uuid)
235
236 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
237 self.log.debug(
238 "Cluster {}, updating {} repository {}".format(
239 cluster_uuid, repo_type, name
240 )
241 )
242
243 # init_env
244 paths, env = self._init_paths_env(
245 cluster_name=cluster_uuid, create_if_not_exist=True
246 )
247
248 # sync local dir
249 self.fs.sync(from_path=cluster_uuid)
250
251 # helm repo update
252 command = "{} repo update {}".format(self._helm_command, quote(name))
253 self.log.debug("updating repo: {}".format(command))
254 await self._local_async_exec(
255 command=command, raise_exception_on_error=False, env=env
256 )
257
258 # sync fs
259 self.fs.reverse_sync(from_path=cluster_uuid)
260
261 async def repo_list(self, cluster_uuid: str) -> list:
262 """
263 Get the list of registered repositories
264
265 :return: list of registered repositories: [ (name, url) .... ]
266 """
267
268 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
269
270 # config filename
271 paths, env = self._init_paths_env(
272 cluster_name=cluster_uuid, create_if_not_exist=True
273 )
274
275 # sync local dir
276 self.fs.sync(from_path=cluster_uuid)
277
278 command = "env KUBECONFIG={} {} repo list --output yaml".format(
279 paths["kube_config"], self._helm_command
280 )
281
282 # Set exception to false because if there are no repos just want an empty list
283 output, _rc = await self._local_async_exec(
284 command=command, raise_exception_on_error=False, env=env
285 )
286
287 # sync fs
288 self.fs.reverse_sync(from_path=cluster_uuid)
289
290 if _rc == 0:
291 if output and len(output) > 0:
292 repos = yaml.load(output, Loader=yaml.SafeLoader)
293 # unify format between helm2 and helm3 setting all keys lowercase
294 return self._lower_keys_list(repos)
295 else:
296 return []
297 else:
298 return []
299
300 async def repo_remove(self, cluster_uuid: str, name: str):
301 self.log.debug(
302 "remove {} repositories for cluster {}".format(name, cluster_uuid)
303 )
304
305 # init env, paths
306 paths, env = self._init_paths_env(
307 cluster_name=cluster_uuid, create_if_not_exist=True
308 )
309
310 # sync local dir
311 self.fs.sync(from_path=cluster_uuid)
312
313 command = "env KUBECONFIG={} {} repo remove {}".format(
314 paths["kube_config"], self._helm_command, quote(name)
315 )
316 await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
319
320 # sync fs
321 self.fs.reverse_sync(from_path=cluster_uuid)
322
323 async def reset(
324 self,
325 cluster_uuid: str,
326 force: bool = False,
327 uninstall_sw: bool = False,
328 **kwargs,
329 ) -> bool:
330 """Reset a cluster
331
332 Resets the Kubernetes cluster by removing the helm deployment that represents it.
333
334 :param cluster_uuid: The UUID of the cluster to reset
335 :param force: Boolean to force the reset
336 :param uninstall_sw: Boolean to force the reset
337 :param kwargs: Additional parameters (None yet)
338 :return: Returns True if successful or raises an exception.
339 """
340 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
341 self.log.debug(
342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343 cluster_uuid, uninstall_sw
344 )
345 )
346
347 # sync local dir
348 self.fs.sync(from_path=cluster_uuid)
349
350 # uninstall releases if needed.
351 if uninstall_sw:
352 releases = await self.instances_list(cluster_uuid=cluster_uuid)
353 if len(releases) > 0:
354 if force:
355 for r in releases:
356 try:
357 kdu_instance = r.get("name")
358 chart = r.get("chart")
359 self.log.debug(
360 "Uninstalling {} -> {}".format(chart, kdu_instance)
361 )
362 await self.uninstall(
363 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
364 )
365 except Exception as e:
366 # will not raise exception as it was found
367 # that in some cases of previously installed helm releases it
368 # raised an error
369 self.log.warn(
370 "Error uninstalling release {}: {}".format(
371 kdu_instance, e
372 )
373 )
374 else:
375 msg = (
376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377 ).format(cluster_uuid)
378 self.log.warn(msg)
379 uninstall_sw = (
380 False # Allow to remove k8s cluster without removing Tiller
381 )
382
383 if uninstall_sw:
384 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
385
386 # delete cluster directory
387 self.log.debug("Removing directory {}".format(cluster_uuid))
388 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
389 # Remove also local directorio if still exist
390 direct = self.fs.path + "/" + cluster_uuid
391 shutil.rmtree(direct, ignore_errors=True)
392
393 return True
394
395 def _is_helm_chart_a_file(self, chart_name: str):
396 return chart_name.count("/") > 1
397
398 @staticmethod
399 def _is_helm_chart_a_url(chart_name: str):
400 result = urlparse(chart_name)
401 return all([result.scheme, result.netloc])
402
403 async def _install_impl(
404 self,
405 cluster_id: str,
406 kdu_model: str,
407 paths: dict,
408 env: dict,
409 kdu_instance: str,
410 atomic: bool = True,
411 timeout: float = 300,
412 params: dict = None,
413 db_dict: dict = None,
414 kdu_name: str = None,
415 namespace: str = None,
416 ):
417 # init env, paths
418 paths, env = self._init_paths_env(
419 cluster_name=cluster_id, create_if_not_exist=True
420 )
421
422 # params to str
423 params_str, file_to_delete = self._params_to_file_option(
424 cluster_id=cluster_id, params=params
425 )
426
427 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
428
429 command = self._get_install_command(
430 kdu_model,
431 kdu_instance,
432 namespace,
433 params_str,
434 version,
435 atomic,
436 timeout,
437 paths["kube_config"],
438 )
439
440 self.log.debug("installing: {}".format(command))
441
442 if atomic:
443 # exec helm in a task
444 exec_task = asyncio.ensure_future(
445 coro_or_future=self._local_async_exec(
446 command=command, raise_exception_on_error=False, env=env
447 )
448 )
449
450 # write status in another task
451 status_task = asyncio.ensure_future(
452 coro_or_future=self._store_status(
453 cluster_id=cluster_id,
454 kdu_instance=kdu_instance,
455 namespace=namespace,
456 db_dict=db_dict,
457 operation="install",
458 )
459 )
460
461 # wait for execution task
462 await asyncio.wait([exec_task])
463
464 # cancel status task
465 status_task.cancel()
466
467 output, rc = exec_task.result()
468
469 else:
470 output, rc = await self._local_async_exec(
471 command=command, raise_exception_on_error=False, env=env
472 )
473
474 # remove temporal values yaml file
475 if file_to_delete:
476 os.remove(file_to_delete)
477
478 # write final status
479 await self._store_status(
480 cluster_id=cluster_id,
481 kdu_instance=kdu_instance,
482 namespace=namespace,
483 db_dict=db_dict,
484 operation="install",
485 )
486
487 if rc != 0:
488 msg = "Error executing command: {}\nOutput: {}".format(command, output)
489 self.log.error(msg)
490 raise K8sException(msg)
491
492 async def upgrade(
493 self,
494 cluster_uuid: str,
495 kdu_instance: str,
496 kdu_model: str = None,
497 atomic: bool = True,
498 timeout: float = 300,
499 params: dict = None,
500 db_dict: dict = None,
501 namespace: str = None,
502 reset_values: bool = False,
503 reuse_values: bool = True,
504 reset_then_reuse_values: bool = False,
505 force: bool = False,
506 ):
507 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
508
509 # sync local dir
510 self.fs.sync(from_path=cluster_uuid)
511
512 # look for instance to obtain namespace
513
514 # set namespace
515 if not namespace:
516 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
517 if not instance_info:
518 raise K8sException("kdu_instance {} not found".format(kdu_instance))
519 namespace = instance_info["namespace"]
520
521 # init env, paths
522 paths, env = self._init_paths_env(
523 cluster_name=cluster_uuid, create_if_not_exist=True
524 )
525
526 # sync local dir
527 self.fs.sync(from_path=cluster_uuid)
528
529 # params to str
530 params_str, file_to_delete = self._params_to_file_option(
531 cluster_id=cluster_uuid, params=params
532 )
533
534 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
535
536 command = self._get_upgrade_command(
537 kdu_model,
538 kdu_instance,
539 namespace,
540 params_str,
541 version,
542 atomic,
543 timeout,
544 paths["kube_config"],
545 reset_values,
546 reuse_values,
547 reset_then_reuse_values,
548 force,
549 )
550
551 self.log.debug("upgrading: {}".format(command))
552
553 if atomic:
554 # exec helm in a task
555 exec_task = asyncio.ensure_future(
556 coro_or_future=self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559 )
560 # write status in another task
561 status_task = asyncio.ensure_future(
562 coro_or_future=self._store_status(
563 cluster_id=cluster_uuid,
564 kdu_instance=kdu_instance,
565 namespace=namespace,
566 db_dict=db_dict,
567 operation="upgrade",
568 )
569 )
570
571 # wait for execution task
572 await asyncio.wait([exec_task])
573
574 # cancel status task
575 status_task.cancel()
576 output, rc = exec_task.result()
577
578 else:
579 output, rc = await self._local_async_exec(
580 command=command, raise_exception_on_error=False, env=env
581 )
582
583 # remove temporal values yaml file
584 if file_to_delete:
585 os.remove(file_to_delete)
586
587 # write final status
588 await self._store_status(
589 cluster_id=cluster_uuid,
590 kdu_instance=kdu_instance,
591 namespace=namespace,
592 db_dict=db_dict,
593 operation="upgrade",
594 )
595
596 if rc != 0:
597 msg = "Error executing command: {}\nOutput: {}".format(command, output)
598 self.log.error(msg)
599 raise K8sException(msg)
600
601 # sync fs
602 self.fs.reverse_sync(from_path=cluster_uuid)
603
604 # return new revision number
605 instance = await self.get_instance_info(
606 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
607 )
608 if instance:
609 revision = int(instance.get("revision"))
610 self.log.debug("New revision: {}".format(revision))
611 return revision
612 else:
613 return 0
614
615 async def scale(
616 self,
617 kdu_instance: str,
618 scale: int,
619 resource_name: str,
620 total_timeout: float = 1800,
621 cluster_uuid: str = None,
622 kdu_model: str = None,
623 atomic: bool = True,
624 db_dict: dict = None,
625 **kwargs,
626 ):
627 """Scale a resource in a Helm Chart.
628
629 Args:
630 kdu_instance: KDU instance name
631 scale: Scale to which to set the resource
632 resource_name: Resource name
633 total_timeout: The time, in seconds, to wait
634 cluster_uuid: The UUID of the cluster
635 kdu_model: The chart reference
636 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
637 The --wait flag will be set automatically if --atomic is used
638 db_dict: Dictionary for any additional data
639 kwargs: Additional parameters
640
641 Returns:
642 True if successful, False otherwise
643 """
644
645 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
646 if resource_name:
647 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
648 resource_name, kdu_model, cluster_uuid
649 )
650
651 self.log.debug(debug_mgs)
652
653 # look for instance to obtain namespace
654 # get_instance_info function calls the sync command
655 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
656 if not instance_info:
657 raise K8sException("kdu_instance {} not found".format(kdu_instance))
658
659 # init env, paths
660 paths, env = self._init_paths_env(
661 cluster_name=cluster_uuid, create_if_not_exist=True
662 )
663
664 # version
665 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
666
667 repo_url = await self._find_repo(kdu_model, cluster_uuid)
668
669 _, replica_str = await self._get_replica_count_url(
670 kdu_model, repo_url, resource_name
671 )
672
673 command = self._get_upgrade_scale_command(
674 kdu_model,
675 kdu_instance,
676 instance_info["namespace"],
677 scale,
678 version,
679 atomic,
680 replica_str,
681 total_timeout,
682 resource_name,
683 paths["kube_config"],
684 )
685
686 self.log.debug("scaling: {}".format(command))
687
688 if atomic:
689 # exec helm in a task
690 exec_task = asyncio.ensure_future(
691 coro_or_future=self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694 )
695 # write status in another task
696 status_task = asyncio.ensure_future(
697 coro_or_future=self._store_status(
698 cluster_id=cluster_uuid,
699 kdu_instance=kdu_instance,
700 namespace=instance_info["namespace"],
701 db_dict=db_dict,
702 operation="scale",
703 )
704 )
705
706 # wait for execution task
707 await asyncio.wait([exec_task])
708
709 # cancel status task
710 status_task.cancel()
711 output, rc = exec_task.result()
712
713 else:
714 output, rc = await self._local_async_exec(
715 command=command, raise_exception_on_error=False, env=env
716 )
717
718 # write final status
719 await self._store_status(
720 cluster_id=cluster_uuid,
721 kdu_instance=kdu_instance,
722 namespace=instance_info["namespace"],
723 db_dict=db_dict,
724 operation="scale",
725 )
726
727 if rc != 0:
728 msg = "Error executing command: {}\nOutput: {}".format(command, output)
729 self.log.error(msg)
730 raise K8sException(msg)
731
732 # sync fs
733 self.fs.reverse_sync(from_path=cluster_uuid)
734
735 return True
736
737 async def get_scale_count(
738 self,
739 resource_name: str,
740 kdu_instance: str,
741 cluster_uuid: str,
742 kdu_model: str,
743 **kwargs,
744 ) -> int:
745 """Get a resource scale count.
746
747 Args:
748 cluster_uuid: The UUID of the cluster
749 resource_name: Resource name
750 kdu_instance: KDU instance name
751 kdu_model: The name or path of an Helm Chart
752 kwargs: Additional parameters
753
754 Returns:
755 Resource instance count
756 """
757
758 self.log.debug(
759 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
760 )
761
762 # look for instance to obtain namespace
763 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
764 if not instance_info:
765 raise K8sException("kdu_instance {} not found".format(kdu_instance))
766
767 # init env, paths
768 paths, _ = self._init_paths_env(
769 cluster_name=cluster_uuid, create_if_not_exist=True
770 )
771
772 replicas = await self._get_replica_count_instance(
773 kdu_instance=kdu_instance,
774 namespace=instance_info["namespace"],
775 kubeconfig=paths["kube_config"],
776 resource_name=resource_name,
777 )
778
779 self.log.debug(
780 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
781 )
782
783 # Get default value if scale count is not found from provided values
784 # Important note: this piece of code shall only be executed in the first scaling operation,
785 # since it is expected that the _get_replica_count_instance is able to obtain the number of
786 # replicas when a scale operation was already conducted previously for this KDU/resource!
787 if replicas is None:
788 repo_url = await self._find_repo(
789 kdu_model=kdu_model, cluster_uuid=cluster_uuid
790 )
791 replicas, _ = await self._get_replica_count_url(
792 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
793 )
794
795 self.log.debug(
796 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
797 f"{resource_name} obtained: {replicas}"
798 )
799
800 if replicas is None:
801 msg = "Replica count not found. Cannot be scaled"
802 self.log.error(msg)
803 raise K8sException(msg)
804
805 return int(replicas)
806
807 async def rollback(
808 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
809 ):
810 self.log.debug(
811 "rollback kdu_instance {} to revision {} from cluster {}".format(
812 kdu_instance, revision, cluster_uuid
813 )
814 )
815
816 # sync local dir
817 self.fs.sync(from_path=cluster_uuid)
818
819 # look for instance to obtain namespace
820 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
821 if not instance_info:
822 raise K8sException("kdu_instance {} not found".format(kdu_instance))
823
824 # init env, paths
825 paths, env = self._init_paths_env(
826 cluster_name=cluster_uuid, create_if_not_exist=True
827 )
828
829 # sync local dir
830 self.fs.sync(from_path=cluster_uuid)
831
832 command = self._get_rollback_command(
833 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
834 )
835
836 self.log.debug("rolling_back: {}".format(command))
837
838 # exec helm in a task
839 exec_task = asyncio.ensure_future(
840 coro_or_future=self._local_async_exec(
841 command=command, raise_exception_on_error=False, env=env
842 )
843 )
844 # write status in another task
845 status_task = asyncio.ensure_future(
846 coro_or_future=self._store_status(
847 cluster_id=cluster_uuid,
848 kdu_instance=kdu_instance,
849 namespace=instance_info["namespace"],
850 db_dict=db_dict,
851 operation="rollback",
852 )
853 )
854
855 # wait for execution task
856 await asyncio.wait([exec_task])
857
858 # cancel status task
859 status_task.cancel()
860
861 output, rc = exec_task.result()
862
863 # write final status
864 await self._store_status(
865 cluster_id=cluster_uuid,
866 kdu_instance=kdu_instance,
867 namespace=instance_info["namespace"],
868 db_dict=db_dict,
869 operation="rollback",
870 )
871
872 if rc != 0:
873 msg = "Error executing command: {}\nOutput: {}".format(command, output)
874 self.log.error(msg)
875 raise K8sException(msg)
876
877 # sync fs
878 self.fs.reverse_sync(from_path=cluster_uuid)
879
880 # return new revision number
881 instance = await self.get_instance_info(
882 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
883 )
884 if instance:
885 revision = int(instance.get("revision"))
886 self.log.debug("New revision: {}".format(revision))
887 return revision
888 else:
889 return 0
890
891 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
892 """
893 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
894 (this call should happen after all _terminate-config-primitive_ of the VNF
895 are invoked).
896
897 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
898 :param kdu_instance: unique name for the KDU instance to be deleted
899 :param kwargs: Additional parameters (None yet)
900 :return: True if successful
901 """
902
903 self.log.debug(
904 "uninstall kdu_instance {} from cluster {}".format(
905 kdu_instance, cluster_uuid
906 )
907 )
908
909 # sync local dir
910 self.fs.sync(from_path=cluster_uuid)
911
912 # look for instance to obtain namespace
913 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
914 if not instance_info:
915 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
916 return True
917 # init env, paths
918 paths, env = self._init_paths_env(
919 cluster_name=cluster_uuid, create_if_not_exist=True
920 )
921
922 # sync local dir
923 self.fs.sync(from_path=cluster_uuid)
924
925 command = self._get_uninstall_command(
926 kdu_instance, instance_info["namespace"], paths["kube_config"]
927 )
928 output, _rc = await self._local_async_exec(
929 command=command, raise_exception_on_error=True, env=env
930 )
931
932 # sync fs
933 self.fs.reverse_sync(from_path=cluster_uuid)
934
935 return self._output_to_table(output)
936
937 async def instances_list(self, cluster_uuid: str) -> list:
938 """
939 returns a list of deployed releases in a cluster
940
941 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
942 :return:
943 """
944
945 self.log.debug("list releases for cluster {}".format(cluster_uuid))
946
947 # sync local dir
948 self.fs.sync(from_path=cluster_uuid)
949
950 # execute internal command
951 result = await self._instances_list(cluster_uuid)
952
953 # sync fs
954 self.fs.reverse_sync(from_path=cluster_uuid)
955
956 return result
957
958 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
959 instances = await self.instances_list(cluster_uuid=cluster_uuid)
960 for instance in instances:
961 if instance.get("name") == kdu_instance:
962 return instance
963 self.log.debug("Instance {} not found".format(kdu_instance))
964 return None
965
966 async def upgrade_charm(
967 self,
968 ee_id: str = None,
969 path: str = None,
970 charm_id: str = None,
971 charm_type: str = None,
972 timeout: float = None,
973 ) -> str:
974 """This method upgrade charms in VNFs
975
976 Args:
977 ee_id: Execution environment id
978 path: Local path to the charm
979 charm_id: charm-id
980 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
981 timeout: (Float) Timeout for the ns update operation
982
983 Returns:
984 The output of the update operation if status equals to "completed"
985 """
986 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
987
988 async def exec_primitive(
989 self,
990 cluster_uuid: str = None,
991 kdu_instance: str = None,
992 primitive_name: str = None,
993 timeout: float = 300,
994 params: dict = None,
995 db_dict: dict = None,
996 **kwargs,
997 ) -> str:
998 """Exec primitive (Juju action)
999
1000 :param cluster_uuid: The UUID of the cluster or namespace:cluster
1001 :param kdu_instance: The unique name of the KDU instance
1002 :param primitive_name: Name of action that will be executed
1003 :param timeout: Timeout for action execution
1004 :param params: Dictionary of all the parameters needed for the action
1005 :db_dict: Dictionary for any additional data
1006 :param kwargs: Additional parameters (None yet)
1007
1008 :return: Returns the output of the action
1009 """
1010 raise K8sException(
1011 "KDUs deployed with Helm don't support actions "
1012 "different from rollback, upgrade and status"
1013 )
1014
1015 async def get_services(
1016 self, cluster_uuid: str, kdu_instance: str, namespace: str
1017 ) -> list:
1018 """
1019 Returns a list of services defined for the specified kdu instance.
1020
1021 :param cluster_uuid: UUID of a K8s cluster known by OSM
1022 :param kdu_instance: unique name for the KDU instance
1023 :param namespace: K8s namespace used by the KDU instance
1024 :return: If successful, it will return a list of services, Each service
1025 can have the following data:
1026 - `name` of the service
1027 - `type` type of service in the k8 cluster
1028 - `ports` List of ports offered by the service, for each port includes at least
1029 name, port, protocol
1030 - `cluster_ip` Internal ip to be used inside k8s cluster
1031 - `external_ip` List of external ips (in case they are available)
1032 """
1033
1034 self.log.debug(
1035 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1036 cluster_uuid, kdu_instance
1037 )
1038 )
1039
1040 # init env, paths
1041 paths, env = self._init_paths_env(
1042 cluster_name=cluster_uuid, create_if_not_exist=True
1043 )
1044
1045 # sync local dir
1046 self.fs.sync(from_path=cluster_uuid)
1047
1048 # get list of services names for kdu
1049 service_names = await self._get_services(
1050 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1051 )
1052
1053 service_list = []
1054 for service in service_names:
1055 service = await self._get_service(cluster_uuid, service, namespace)
1056 service_list.append(service)
1057
1058 # sync fs
1059 self.fs.reverse_sync(from_path=cluster_uuid)
1060
1061 return service_list
1062
1063 async def get_service(
1064 self, cluster_uuid: str, service_name: str, namespace: str
1065 ) -> object:
1066 self.log.debug(
1067 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1068 service_name, namespace, cluster_uuid
1069 )
1070 )
1071
1072 # sync local dir
1073 self.fs.sync(from_path=cluster_uuid)
1074
1075 service = await self._get_service(cluster_uuid, service_name, namespace)
1076
1077 # sync fs
1078 self.fs.reverse_sync(from_path=cluster_uuid)
1079
1080 return service
1081
1082 async def status_kdu(
1083 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1084 ) -> Union[str, dict]:
1085 """
1086 This call would retrieve tha current state of a given KDU instance. It would be
1087 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1088 values_ of the configuration parameters applied to a given instance. This call
1089 would be based on the `status` call.
1090
1091 :param cluster_uuid: UUID of a K8s cluster known by OSM
1092 :param kdu_instance: unique name for the KDU instance
1093 :param kwargs: Additional parameters (None yet)
1094 :param yaml_format: if the return shall be returned as an YAML string or as a
1095 dictionary
1096 :return: If successful, it will return the following vector of arguments:
1097 - K8s `namespace` in the cluster where the KDU lives
1098 - `state` of the KDU instance. It can be:
1099 - UNKNOWN
1100 - DEPLOYED
1101 - DELETED
1102 - SUPERSEDED
1103 - FAILED or
1104 - DELETING
1105 - List of `resources` (objects) that this release consists of, sorted by kind,
1106 and the status of those resources
1107 - Last `deployment_time`.
1108
1109 """
1110 self.log.debug(
1111 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1112 cluster_uuid, kdu_instance
1113 )
1114 )
1115
1116 # sync local dir
1117 self.fs.sync(from_path=cluster_uuid)
1118
1119 # get instance: needed to obtain namespace
1120 instances = await self._instances_list(cluster_id=cluster_uuid)
1121 for instance in instances:
1122 if instance.get("name") == kdu_instance:
1123 break
1124 else:
1125 # instance does not exist
1126 raise K8sException(
1127 "Instance name: {} not found in cluster: {}".format(
1128 kdu_instance, cluster_uuid
1129 )
1130 )
1131
1132 status = await self._status_kdu(
1133 cluster_id=cluster_uuid,
1134 kdu_instance=kdu_instance,
1135 namespace=instance["namespace"],
1136 yaml_format=yaml_format,
1137 show_error_log=True,
1138 )
1139
1140 # sync fs
1141 self.fs.reverse_sync(from_path=cluster_uuid)
1142
1143 return status
1144
1145 async def get_values_kdu(
1146 self, kdu_instance: str, namespace: str, kubeconfig: str
1147 ) -> str:
1148 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1149
1150 return await self._exec_get_command(
1151 get_command="values",
1152 kdu_instance=kdu_instance,
1153 namespace=namespace,
1154 kubeconfig=kubeconfig,
1155 )
1156
1157 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1158 """Method to obtain the Helm Chart package's values
1159
1160 Args:
1161 kdu_model: The name or path of an Helm Chart
1162 repo_url: Helm Chart repository url
1163
1164 Returns:
1165 str: the values of the Helm Chart package
1166 """
1167
1168 self.log.debug(
1169 "inspect kdu_model values {} from (optional) repo: {}".format(
1170 kdu_model, repo_url
1171 )
1172 )
1173
1174 return await self._exec_inspect_command(
1175 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1176 )
1177
1178 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1179 self.log.debug(
1180 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1181 )
1182
1183 return await self._exec_inspect_command(
1184 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1185 )
1186
1187 async def synchronize_repos(self, cluster_uuid: str):
1188 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1189 try:
1190 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1191 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1192
1193 local_repo_list = await self.repo_list(cluster_uuid)
1194 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1195
1196 deleted_repo_list = []
1197 added_repo_dict = {}
1198
1199 # iterate over the list of repos in the database that should be
1200 # added if not present
1201 for repo_name, db_repo in db_repo_dict.items():
1202 try:
1203 # check if it is already present
1204 curr_repo_url = local_repo_dict.get(db_repo["name"])
1205 repo_id = db_repo.get("_id")
1206 if curr_repo_url != db_repo["url"]:
1207 if curr_repo_url:
1208 self.log.debug(
1209 "repo {} url changed, delete and and again".format(
1210 db_repo["url"]
1211 )
1212 )
1213 await self.repo_remove(cluster_uuid, db_repo["name"])
1214 deleted_repo_list.append(repo_id)
1215
1216 # add repo
1217 self.log.debug("add repo {}".format(db_repo["name"]))
1218 await self.repo_add(
1219 cluster_uuid,
1220 db_repo["name"],
1221 db_repo["url"],
1222 cert=db_repo.get("ca_cert"),
1223 user=db_repo.get("user"),
1224 password=db_repo.get("password"),
1225 oci=db_repo.get("oci", False),
1226 )
1227 added_repo_dict[repo_id] = db_repo["name"]
1228 except Exception as e:
1229 raise K8sException(
1230 "Error adding repo id: {}, err_msg: {} ".format(
1231 repo_id, repr(e)
1232 )
1233 )
1234
1235 # Delete repos that are present but not in nbi_list
1236 for repo_name in local_repo_dict:
1237 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1238 self.log.debug("delete repo {}".format(repo_name))
1239 try:
1240 await self.repo_remove(cluster_uuid, repo_name)
1241 deleted_repo_list.append(repo_name)
1242 except Exception as e:
1243 self.warning(
1244 "Error deleting repo, name: {}, err_msg: {}".format(
1245 repo_name, str(e)
1246 )
1247 )
1248
1249 return deleted_repo_list, added_repo_dict
1250
1251 except K8sException:
1252 raise
1253 except Exception as e:
1254 # Do not raise errors synchronizing repos
1255 self.log.error("Error synchronizing repos: {}".format(e))
1256 raise Exception("Error synchronizing repos: {}".format(e))
1257
1258 def _get_db_repos_dict(self, repo_ids: list):
1259 db_repos_dict = {}
1260 for repo_id in repo_ids:
1261 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1262 db_repos_dict[db_repo["name"]] = db_repo
1263 return db_repos_dict
1264
1265 """
1266 ####################################################################################
1267 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1268 ####################################################################################
1269 """
1270
1271 @abc.abstractmethod
1272 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1273 """
1274 Creates and returns base cluster and kube dirs and returns them.
1275 Also created helm3 dirs according to new directory specification, paths are
1276 not returned but assigned to helm environment variables
1277
1278 :param cluster_name: cluster_name
1279 :return: Dictionary with config_paths and dictionary with helm environment variables
1280 """
1281
1282 @abc.abstractmethod
1283 async def _cluster_init(self, cluster_id, namespace, paths, env):
1284 """
1285 Implements the helm version dependent cluster initialization
1286 """
1287
1288 @abc.abstractmethod
1289 async def _instances_list(self, cluster_id):
1290 """
1291 Implements the helm version dependent helm instances list
1292 """
1293
1294 @abc.abstractmethod
1295 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1296 """
1297 Implements the helm version dependent method to obtain services from a helm instance
1298 """
1299
1300 @abc.abstractmethod
1301 async def _status_kdu(
1302 self,
1303 cluster_id: str,
1304 kdu_instance: str,
1305 namespace: str = None,
1306 yaml_format: bool = False,
1307 show_error_log: bool = False,
1308 ) -> Union[str, dict]:
1309 """
1310 Implements the helm version dependent method to obtain status of a helm instance
1311 """
1312
1313 @abc.abstractmethod
1314 def _get_install_command(
1315 self,
1316 kdu_model,
1317 kdu_instance,
1318 namespace,
1319 params_str,
1320 version,
1321 atomic,
1322 timeout,
1323 kubeconfig,
1324 ) -> str:
1325 """
1326 Obtain command to be executed to delete the indicated instance
1327 """
1328
1329 @abc.abstractmethod
1330 def _get_upgrade_scale_command(
1331 self,
1332 kdu_model,
1333 kdu_instance,
1334 namespace,
1335 count,
1336 version,
1337 atomic,
1338 replicas,
1339 timeout,
1340 resource_name,
1341 kubeconfig,
1342 ) -> str:
1343 """Generates the command to scale a Helm Chart release
1344
1345 Args:
1346 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1347 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1348 namespace (str): Namespace where this KDU instance is deployed
1349 scale (int): Scale count
1350 version (str): Constraint with specific version of the Chart to use
1351 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1352 The --wait flag will be set automatically if --atomic is used
1353 replica_str (str): The key under resource_name key where the scale count is stored
1354 timeout (float): The time, in seconds, to wait
1355 resource_name (str): The KDU's resource to scale
1356 kubeconfig (str): Kubeconfig file path
1357
1358 Returns:
1359 str: command to scale a Helm Chart release
1360 """
1361
1362 @abc.abstractmethod
1363 def _get_upgrade_command(
1364 self,
1365 kdu_model,
1366 kdu_instance,
1367 namespace,
1368 params_str,
1369 version,
1370 atomic,
1371 timeout,
1372 kubeconfig,
1373 reset_values,
1374 reuse_values,
1375 reset_then_reuse_values,
1376 force,
1377 ) -> str:
1378 """Generates the command to upgrade a Helm Chart release
1379
1380 Args:
1381 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1382 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1383 namespace (str): Namespace where this KDU instance is deployed
1384 params_str (str): Params used to upgrade the Helm Chart release
1385 version (str): Constraint with specific version of the Chart to use
1386 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1387 The --wait flag will be set automatically if --atomic is used
1388 timeout (float): The time, in seconds, to wait
1389 kubeconfig (str): Kubeconfig file path
1390 reset_values(bool): If set, helm resets values instead of reusing previous values.
1391 reuse_values(bool): If set, helm reuses previous values.
1392 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
1393 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1394 Returns:
1395 str: command to upgrade a Helm Chart release
1396 """
1397
1398 @abc.abstractmethod
1399 def _get_rollback_command(
1400 self, kdu_instance, namespace, revision, kubeconfig
1401 ) -> str:
1402 """
1403 Obtain command to be executed to rollback the indicated instance
1404 """
1405
1406 @abc.abstractmethod
1407 def _get_uninstall_command(
1408 self, kdu_instance: str, namespace: str, kubeconfig: str
1409 ) -> str:
1410 """
1411 Obtain command to be executed to delete the indicated instance
1412 """
1413
1414 @abc.abstractmethod
1415 def _get_inspect_command(
1416 self, show_command: str, kdu_model: str, repo_str: str, version: str
1417 ):
1418 """Generates the command to obtain the information about an Helm Chart package
1419 (´helm show ...´ command)
1420
1421 Args:
1422 show_command: the second part of the command (`helm show <show_command>`)
1423 kdu_model: The name or path of an Helm Chart
1424 repo_url: Helm Chart repository url
1425 version: constraint with specific version of the Chart to use
1426
1427 Returns:
1428 str: the generated Helm Chart command
1429 """
1430
1431 @abc.abstractmethod
1432 def _get_get_command(
1433 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1434 ):
1435 """Obtain command to be executed to get information about the kdu instance."""
1436
1437 @abc.abstractmethod
1438 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1439 """
1440 Method call to uninstall cluster software for helm. This method is dependent
1441 of helm version
1442 For Helm v2 it will be called when Tiller must be uninstalled
1443 For Helm v3 it does nothing and does not need to be callled
1444 """
1445
1446 @abc.abstractmethod
1447 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1448 """
1449 Obtains the cluster repos identifiers
1450 """
1451
1452 """
1453 ####################################################################################
1454 ################################### P R I V A T E ##################################
1455 ####################################################################################
1456 """
1457
1458 @staticmethod
1459 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1460 if os.path.exists(filename):
1461 return True
1462 else:
1463 msg = "File {} does not exist".format(filename)
1464 if exception_if_not_exists:
1465 raise K8sException(msg)
1466
1467 @staticmethod
1468 def _remove_multiple_spaces(strobj):
1469 strobj = strobj.strip()
1470 while " " in strobj:
1471 strobj = strobj.replace(" ", " ")
1472 return strobj
1473
1474 @staticmethod
1475 def _output_to_lines(output: str) -> list:
1476 output_lines = list()
1477 lines = output.splitlines(keepends=False)
1478 for line in lines:
1479 line = line.strip()
1480 if len(line) > 0:
1481 output_lines.append(line)
1482 return output_lines
1483
1484 @staticmethod
1485 def _output_to_table(output: str) -> list:
1486 output_table = list()
1487 lines = output.splitlines(keepends=False)
1488 for line in lines:
1489 line = line.replace("\t", " ")
1490 line_list = list()
1491 output_table.append(line_list)
1492 cells = line.split(sep=" ")
1493 for cell in cells:
1494 cell = cell.strip()
1495 if len(cell) > 0:
1496 line_list.append(cell)
1497 return output_table
1498
1499 @staticmethod
1500 def _parse_services(output: str) -> list:
1501 lines = output.splitlines(keepends=False)
1502 services = []
1503 for line in lines:
1504 line = line.replace("\t", " ")
1505 cells = line.split(sep=" ")
1506 if len(cells) > 0 and cells[0].startswith("service/"):
1507 elems = cells[0].split(sep="/")
1508 if len(elems) > 1:
1509 services.append(elems[1])
1510 return services
1511
1512 @staticmethod
1513 def _get_deep(dictionary: dict, members: tuple):
1514 target = dictionary
1515 value = None
1516 try:
1517 for m in members:
1518 value = target.get(m)
1519 if not value:
1520 return None
1521 else:
1522 target = value
1523 except Exception:
1524 pass
1525 return value
1526
1527 # find key:value in several lines
1528 @staticmethod
1529 def _find_in_lines(p_lines: list, p_key: str) -> str:
1530 for line in p_lines:
1531 try:
1532 if line.startswith(p_key + ":"):
1533 parts = line.split(":")
1534 the_value = parts[1].strip()
1535 return the_value
1536 except Exception:
1537 # ignore it
1538 pass
1539 return None
1540
1541 @staticmethod
1542 def _lower_keys_list(input_list: list):
1543 """
1544 Transform the keys in a list of dictionaries to lower case and returns a new list
1545 of dictionaries
1546 """
1547 new_list = []
1548 if input_list:
1549 for dictionary in input_list:
1550 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1551 new_list.append(new_dict)
1552 return new_list
1553
1554 async def _local_async_exec(
1555 self,
1556 command: str,
1557 raise_exception_on_error: bool = False,
1558 show_error_log: bool = True,
1559 encode_utf8: bool = False,
1560 env: dict = None,
1561 ) -> tuple[str, int]:
1562 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1563 self.log.debug(
1564 "Executing async local command: {}, env: {}".format(command, env)
1565 )
1566
1567 # split command
1568 command = shlex.split(command)
1569
1570 environ = os.environ.copy()
1571 if env:
1572 environ.update(env)
1573
1574 try:
1575 async with self.cmd_lock:
1576 process = await asyncio.create_subprocess_exec(
1577 *command,
1578 stdout=asyncio.subprocess.PIPE,
1579 stderr=asyncio.subprocess.PIPE,
1580 env=environ,
1581 )
1582
1583 # wait for command terminate
1584 stdout, stderr = await process.communicate()
1585
1586 return_code = process.returncode
1587
1588 output = ""
1589 if stdout:
1590 output = stdout.decode("utf-8").strip()
1591 # output = stdout.decode()
1592 if stderr:
1593 output = stderr.decode("utf-8").strip()
1594 # output = stderr.decode()
1595
1596 if return_code != 0 and show_error_log:
1597 self.log.debug(
1598 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1599 )
1600 else:
1601 self.log.debug("Return code: {}".format(return_code))
1602
1603 if raise_exception_on_error and return_code != 0:
1604 raise K8sException(output)
1605
1606 if encode_utf8:
1607 output = output.encode("utf-8").strip()
1608 output = str(output).replace("\\n", "\n")
1609
1610 return output, return_code
1611
1612 except asyncio.CancelledError:
1613 # first, kill the process if it is still running
1614 if process.returncode is None:
1615 process.kill()
1616 raise
1617 except K8sException:
1618 raise
1619 except Exception as e:
1620 msg = "Exception executing command: {} -> {}".format(command, e)
1621 self.log.error(msg)
1622 if raise_exception_on_error:
1623 raise K8sException(e) from e
1624 else:
1625 return "", -1
1626
1627 async def _local_async_exec_pipe(
1628 self,
1629 command1: str,
1630 command2: str,
1631 raise_exception_on_error: bool = True,
1632 show_error_log: bool = True,
1633 encode_utf8: bool = False,
1634 env: dict = None,
1635 ):
1636 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1637 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1638 command = "{} | {}".format(command1, command2)
1639 self.log.debug(
1640 "Executing async local command: {}, env: {}".format(command, env)
1641 )
1642
1643 # split command
1644 command1 = shlex.split(command1)
1645 command2 = shlex.split(command2)
1646
1647 environ = os.environ.copy()
1648 if env:
1649 environ.update(env)
1650
1651 try:
1652 async with self.cmd_lock:
1653 read, write = os.pipe()
1654 process_1 = await asyncio.create_subprocess_exec(
1655 *command1, stdout=write, env=environ
1656 )
1657 os.close(write)
1658 process_2 = await asyncio.create_subprocess_exec(
1659 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1660 )
1661 os.close(read)
1662 stdout, stderr = await process_2.communicate()
1663
1664 return_code = process_2.returncode
1665
1666 output = ""
1667 if stdout:
1668 output = stdout.decode("utf-8").strip()
1669 # output = stdout.decode()
1670 if stderr:
1671 output = stderr.decode("utf-8").strip()
1672 # output = stderr.decode()
1673
1674 if return_code != 0 and show_error_log:
1675 self.log.debug(
1676 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1677 )
1678 else:
1679 self.log.debug("Return code: {}".format(return_code))
1680
1681 if raise_exception_on_error and return_code != 0:
1682 raise K8sException(output)
1683
1684 if encode_utf8:
1685 output = output.encode("utf-8").strip()
1686 output = str(output).replace("\\n", "\n")
1687
1688 return output, return_code
1689 except asyncio.CancelledError:
1690 # first, kill the processes if they are still running
1691 for process in (process_1, process_2):
1692 if process.returncode is None:
1693 process.kill()
1694 raise
1695 except K8sException:
1696 raise
1697 except Exception as e:
1698 msg = "Exception executing command: {} -> {}".format(command, e)
1699 self.log.error(msg)
1700 if raise_exception_on_error:
1701 raise K8sException(e) from e
1702 else:
1703 return "", -1
1704
1705 async def _get_service(self, cluster_id, service_name, namespace):
1706 """
1707 Obtains the data of the specified service in the k8cluster.
1708
1709 :param cluster_id: id of a K8s cluster known by OSM
1710 :param service_name: name of the K8s service in the specified namespace
1711 :param namespace: K8s namespace used by the KDU instance
1712 :return: If successful, it will return a service with the following data:
1713 - `name` of the service
1714 - `type` type of service in the k8 cluster
1715 - `ports` List of ports offered by the service, for each port includes at least
1716 name, port, protocol
1717 - `cluster_ip` Internal ip to be used inside k8s cluster
1718 - `external_ip` List of external ips (in case they are available)
1719 """
1720
1721 # init config, env
1722 paths, env = self._init_paths_env(
1723 cluster_name=cluster_id, create_if_not_exist=True
1724 )
1725
1726 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1727 self.kubectl_command,
1728 paths["kube_config"],
1729 quote(namespace),
1730 quote(service_name),
1731 )
1732
1733 output, _rc = await self._local_async_exec(
1734 command=command, raise_exception_on_error=True, env=env
1735 )
1736
1737 data = yaml.load(output, Loader=yaml.SafeLoader)
1738
1739 service = {
1740 "name": service_name,
1741 "type": self._get_deep(data, ("spec", "type")),
1742 "ports": self._get_deep(data, ("spec", "ports")),
1743 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1744 }
1745 if service["type"] == "LoadBalancer":
1746 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1747 ip_list = [elem["ip"] for elem in ip_map_list]
1748 service["external_ip"] = ip_list
1749
1750 return service
1751
1752 async def _exec_get_command(
1753 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1754 ):
1755 """Obtains information about the kdu instance."""
1756
1757 full_command = self._get_get_command(
1758 get_command, kdu_instance, namespace, kubeconfig
1759 )
1760
1761 output, _rc = await self._local_async_exec(command=full_command)
1762
1763 return output
1764
1765 async def _exec_inspect_command(
1766 self, inspect_command: str, kdu_model: str, repo_url: str = None
1767 ):
1768 """Obtains information about an Helm Chart package (´helm show´ command)
1769
1770 Args:
1771 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1772 kdu_model: The name or path of an Helm Chart
1773 repo_url: Helm Chart repository url
1774
1775 Returns:
1776 str: the requested info about the Helm Chart package
1777 """
1778
1779 repo_str = ""
1780 if repo_url:
1781 repo_str = " --repo {}".format(quote(repo_url))
1782
1783 # Obtain the Chart's name and store it in the var kdu_model
1784 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1785
1786 kdu_model, version = self._split_version(kdu_model)
1787 if version:
1788 version_str = "--version {}".format(quote(version))
1789 else:
1790 version_str = ""
1791
1792 full_command = self._get_inspect_command(
1793 show_command=inspect_command,
1794 kdu_model=quote(kdu_model),
1795 repo_str=repo_str,
1796 version=version_str,
1797 )
1798
1799 output, _ = await self._local_async_exec(command=full_command)
1800
1801 return output
1802
1803 async def _get_replica_count_url(
1804 self,
1805 kdu_model: str,
1806 repo_url: str = None,
1807 resource_name: str = None,
1808 ) -> tuple[int, str]:
1809 """Get the replica count value in the Helm Chart Values.
1810
1811 Args:
1812 kdu_model: The name or path of an Helm Chart
1813 repo_url: Helm Chart repository url
1814 resource_name: Resource name
1815
1816 Returns:
1817 A tuple with:
1818 - The number of replicas of the specific instance; if not found, returns None; and
1819 - The string corresponding to the replica count key in the Helm values
1820 """
1821
1822 kdu_values = yaml.load(
1823 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1824 Loader=yaml.SafeLoader,
1825 )
1826
1827 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1828
1829 if not kdu_values:
1830 raise K8sException(
1831 "kdu_values not found for kdu_model {}".format(kdu_model)
1832 )
1833
1834 if resource_name:
1835 kdu_values = kdu_values.get(resource_name, None)
1836
1837 if not kdu_values:
1838 msg = "resource {} not found in the values in model {}".format(
1839 resource_name, kdu_model
1840 )
1841 self.log.error(msg)
1842 raise K8sException(msg)
1843
1844 duplicate_check = False
1845
1846 replica_str = ""
1847 replicas = None
1848
1849 if kdu_values.get("replicaCount") is not None:
1850 replicas = kdu_values["replicaCount"]
1851 replica_str = "replicaCount"
1852 elif kdu_values.get("replicas") is not None:
1853 duplicate_check = True
1854 replicas = kdu_values["replicas"]
1855 replica_str = "replicas"
1856 else:
1857 if resource_name:
1858 msg = (
1859 "replicaCount or replicas not found in the resource"
1860 "{} values in model {}. Cannot be scaled".format(
1861 resource_name, kdu_model
1862 )
1863 )
1864 else:
1865 msg = (
1866 "replicaCount or replicas not found in the values"
1867 "in model {}. Cannot be scaled".format(kdu_model)
1868 )
1869 self.log.error(msg)
1870 raise K8sException(msg)
1871
1872 # Control if replicas and replicaCount exists at the same time
1873 msg = "replicaCount and replicas are exists at the same time"
1874 if duplicate_check:
1875 if "replicaCount" in kdu_values:
1876 self.log.error(msg)
1877 raise K8sException(msg)
1878 else:
1879 if "replicas" in kdu_values:
1880 self.log.error(msg)
1881 raise K8sException(msg)
1882
1883 return replicas, replica_str
1884
1885 async def _get_replica_count_instance(
1886 self,
1887 kdu_instance: str,
1888 namespace: str,
1889 kubeconfig: str,
1890 resource_name: str = None,
1891 ) -> int:
1892 """Get the replica count value in the instance.
1893
1894 Args:
1895 kdu_instance: The name of the KDU instance
1896 namespace: KDU instance namespace
1897 kubeconfig:
1898 resource_name: Resource name
1899
1900 Returns:
1901 The number of replicas of the specific instance; if not found, returns None
1902 """
1903
1904 kdu_values = yaml.load(
1905 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1906 Loader=yaml.SafeLoader,
1907 )
1908
1909 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1910
1911 replicas = None
1912
1913 if kdu_values:
1914 resource_values = (
1915 kdu_values.get(resource_name, None) if resource_name else None
1916 )
1917
1918 for replica_str in ("replicaCount", "replicas"):
1919 if resource_values:
1920 replicas = resource_values.get(replica_str)
1921 else:
1922 replicas = kdu_values.get(replica_str)
1923
1924 if replicas is not None:
1925 break
1926
1927 return replicas
1928
1929 async def _store_status(
1930 self,
1931 cluster_id: str,
1932 operation: str,
1933 kdu_instance: str,
1934 namespace: str = None,
1935 db_dict: dict = None,
1936 ) -> None:
1937 """
1938 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1939
1940 :param cluster_id (str): the cluster where the KDU instance is deployed
1941 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1942 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1943 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1944 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1945 values for the keys:
1946 - "collection": The Mongo DB collection to write to
1947 - "filter": The query filter to use in the update process
1948 - "path": The dot separated keys which targets the object to be updated
1949 Defaults to None.
1950 """
1951
1952 try:
1953 detailed_status = await self._status_kdu(
1954 cluster_id=cluster_id,
1955 kdu_instance=kdu_instance,
1956 yaml_format=False,
1957 namespace=namespace,
1958 )
1959
1960 status = detailed_status.get("info").get("description")
1961 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1962
1963 # write status to db
1964 result = await self.write_app_status_to_db(
1965 db_dict=db_dict,
1966 status=str(status),
1967 detailed_status=str(detailed_status),
1968 operation=operation,
1969 )
1970
1971 if not result:
1972 self.log.info("Error writing in database. Task exiting...")
1973
1974 except asyncio.CancelledError as e:
1975 self.log.warning(
1976 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1977 )
1978 except Exception as e:
1979 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1980
1981 # params for use in -f file
1982 # returns values file option and filename (in order to delete it at the end)
1983 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
1984 if params and len(params) > 0:
1985 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1986
1987 def get_random_number():
1988 r = random.SystemRandom().randint(1, 99999999)
1989 s = str(r)
1990 while len(s) < 10:
1991 s = "0" + s
1992 return s
1993
1994 params2 = dict()
1995 for key in params:
1996 value = params.get(key)
1997 if "!!yaml" in str(value):
1998 value = yaml.safe_load(value[7:])
1999 params2[key] = value
2000
2001 values_file = get_random_number() + ".yaml"
2002 with open(values_file, "w") as stream:
2003 yaml.dump(params2, stream, indent=4, default_flow_style=False)
2004
2005 return "-f {}".format(values_file), values_file
2006
2007 return "", None
2008
2009 # params for use in --set option
2010 @staticmethod
2011 def _params_to_set_option(params: dict) -> str:
2012 pairs = [
2013 f"{quote(str(key))}={quote(str(value))}"
2014 for key, value in params.items()
2015 if value is not None
2016 ]
2017 if not pairs:
2018 return ""
2019 return "--set " + ",".join(pairs)
2020
2021 @staticmethod
2022 def generate_kdu_instance_name(**kwargs):
2023 chart_name = kwargs["kdu_model"]
2024 # check embeded chart (file or dir)
2025 if chart_name.startswith("/"):
2026 # extract file or directory name
2027 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2028 # check URL
2029 elif "://" in chart_name:
2030 # extract last portion of URL
2031 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2032
2033 name = ""
2034 for c in chart_name:
2035 if c.isalpha() or c.isnumeric():
2036 name += c
2037 else:
2038 name += "-"
2039 if len(name) > 35:
2040 name = name[0:35]
2041
2042 # if does not start with alpha character, prefix 'a'
2043 if not name[0].isalpha():
2044 name = "a" + name
2045
2046 name += "-"
2047
2048 def get_random_number():
2049 r = random.SystemRandom().randint(1, 99999999)
2050 s = str(r)
2051 s = s.rjust(10, "0")
2052 return s
2053
2054 name = name + get_random_number()
2055 return name.lower()
2056
2057 def _split_version(self, kdu_model: str) -> tuple[str, str]:
2058 version = None
2059 if (
2060 not (
2061 self._is_helm_chart_a_file(kdu_model)
2062 or self._is_helm_chart_a_url(kdu_model)
2063 )
2064 and ":" in kdu_model
2065 ):
2066 parts = kdu_model.split(sep=":")
2067 if len(parts) == 2:
2068 version = str(parts[1])
2069 kdu_model = parts[0]
2070 return kdu_model, version
2071
2072 def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2073 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2074
2075 Args:
2076 kdu_model (str): Associated KDU model
2077
2078 Returns:
2079 (str, str): Tuple with the Chart name in index 0, and the repo name
2080 in index 2; if there was a problem finding them, return None
2081 for both
2082 """
2083
2084 chart_name = None
2085 repo_name = None
2086
2087 idx = kdu_model.find("/")
2088 if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
2089 chart_name = kdu_model[idx + 1 :]
2090 repo_name = kdu_model[:idx]
2091
2092 return chart_name, repo_name
2093
2094 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2095 """Obtain the Helm repository for an Helm Chart
2096
2097 Args:
2098 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2099 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2100
2101 Returns:
2102 str: the repository URL; if Helm Chart is a local one, the function returns None
2103 """
2104
2105 _, repo_name = self._split_repo(kdu_model=kdu_model)
2106
2107 repo_url = None
2108 if repo_name:
2109 # Find repository link
2110 local_repo_list = await self.repo_list(cluster_uuid)
2111 for repo in local_repo_list:
2112 if repo["name"] == repo_name:
2113 repo_url = repo["url"]
2114 break # it is not necessary to continue the loop if the repo link was found...
2115
2116 return repo_url
2117
2118 def _repo_to_oci_url(self, repo):
2119 db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
2120 if db_repo and "oci" in db_repo:
2121 return db_repo.get("url")
2122
2123 async def _prepare_helm_chart(self, kdu_model, cluster_id):
2124 # e.g.: "stable/openldap", "1.0"
2125 kdu_model, version = self._split_version(kdu_model)
2126 # e.g.: "openldap, stable"
2127 chart_name, repo = self._split_repo(kdu_model)
2128 if repo and chart_name: # repo/chart case
2129 oci_url = self._repo_to_oci_url(repo)
2130 if oci_url: # oci does not require helm repo update
2131 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2132 else:
2133 await self.repo_update(cluster_id, repo)
2134 return kdu_model, version
2135
2136 async def create_certificate(
2137 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2138 ):
2139 paths, env = self._init_paths_env(
2140 cluster_name=cluster_uuid, create_if_not_exist=True
2141 )
2142 kubectl = Kubectl(config_file=paths["kube_config"])
2143 await kubectl.create_certificate(
2144 namespace=namespace,
2145 name=name,
2146 dns_prefix=dns_prefix,
2147 secret_name=secret_name,
2148 usages=[usage],
2149 issuer_name="ca-issuer",
2150 )
2151
2152 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2153 paths, env = self._init_paths_env(
2154 cluster_name=cluster_uuid, create_if_not_exist=True
2155 )
2156 kubectl = Kubectl(config_file=paths["kube_config"])
2157 await kubectl.delete_certificate(namespace, certificate_name)
2158
2159 async def create_namespace(
2160 self,
2161 namespace,
2162 cluster_uuid,
2163 labels,
2164 ):
2165 """
2166 Create a namespace in a specific cluster
2167
2168 :param namespace: Namespace to be created
2169 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2170 :param labels: Dictionary with labels for the new namespace
2171 :returns: None
2172 """
2173 paths, env = self._init_paths_env(
2174 cluster_name=cluster_uuid, create_if_not_exist=True
2175 )
2176 kubectl = Kubectl(config_file=paths["kube_config"])
2177 await kubectl.create_namespace(
2178 name=namespace,
2179 labels=labels,
2180 )
2181
2182 async def delete_namespace(
2183 self,
2184 namespace,
2185 cluster_uuid,
2186 ):
2187 """
2188 Delete a namespace in a specific cluster
2189
2190 :param namespace: namespace to be deleted
2191 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2192 :returns: None
2193 """
2194 paths, env = self._init_paths_env(
2195 cluster_name=cluster_uuid, create_if_not_exist=True
2196 )
2197 kubectl = Kubectl(config_file=paths["kube_config"])
2198 await kubectl.delete_namespace(
2199 name=namespace,
2200 )
2201
2202 async def copy_secret_data(
2203 self,
2204 src_secret: str,
2205 dst_secret: str,
2206 cluster_uuid: str,
2207 data_key: str,
2208 src_namespace: str = "osm",
2209 dst_namespace: str = "osm",
2210 ):
2211 """
2212 Copy a single key and value from an existing secret to a new one
2213
2214 :param src_secret: name of the existing secret
2215 :param dst_secret: name of the new secret
2216 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2217 :param data_key: key of the existing secret to be copied
2218 :param src_namespace: Namespace of the existing secret
2219 :param dst_namespace: Namespace of the new secret
2220 :returns: None
2221 """
2222 paths, env = self._init_paths_env(
2223 cluster_name=cluster_uuid, create_if_not_exist=True
2224 )
2225 kubectl = Kubectl(config_file=paths["kube_config"])
2226 secret_data = await kubectl.get_secret_content(
2227 name=src_secret,
2228 namespace=src_namespace,
2229 )
2230 # Only the corresponding data_key value needs to be copy
2231 data = {data_key: secret_data.get(data_key)}
2232 await kubectl.create_secret(
2233 name=dst_secret,
2234 data=data,
2235 namespace=dst_namespace,
2236 secret_type="Opaque",
2237 )
2238
2239 async def setup_default_rbac(
2240 self,
2241 name,
2242 namespace,
2243 cluster_uuid,
2244 api_groups,
2245 resources,
2246 verbs,
2247 service_account,
2248 ):
2249 """
2250 Create a basic RBAC for a new namespace.
2251
2252 :param name: name of both Role and Role Binding
2253 :param namespace: K8s namespace
2254 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2255 :param api_groups: Api groups to be allowed in Policy Rule
2256 :param resources: Resources to be allowed in Policy Rule
2257 :param verbs: Verbs to be allowed in Policy Rule
2258 :param service_account: Service Account name used to bind the Role
2259 :returns: None
2260 """
2261 paths, env = self._init_paths_env(
2262 cluster_name=cluster_uuid, create_if_not_exist=True
2263 )
2264 kubectl = Kubectl(config_file=paths["kube_config"])
2265 await kubectl.create_role(
2266 name=name,
2267 labels={},
2268 namespace=namespace,
2269 api_groups=api_groups,
2270 resources=resources,
2271 verbs=verbs,
2272 )
2273 await kubectl.create_role_binding(
2274 name=name,
2275 labels={},
2276 namespace=namespace,
2277 role_name=name,
2278 sa_name=service_account,
2279 )