Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 from shlex import quote
26 import random
27 import time
28 import shlex
29 import shutil
30 import stat
31 import os
32 import yaml
33 from uuid import uuid4
34 from urllib.parse import urlparse
35
36 from n2vc.config import EnvironConfig
37 from n2vc.exceptions import K8sException
38 from n2vc.k8s_conn import K8sConnector
39 from n2vc.kubectl import Kubectl
40
41
42 class K8sHelmBaseConnector(K8sConnector):
43
44 """
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
48 """
49
50 service_account = "osm"
51
52 def __init__(
53 self,
54 fs: object,
55 db: object,
56 kubectl_command: str = "/usr/bin/kubectl",
57 helm_command: str = "/usr/bin/helm",
58 log: object = None,
59 on_update_db=None,
60 ):
61 """
62
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
67 :param log: logger
68 :param on_update_db: callback called when k8s connector updates database
69 """
70
71 # parent class
72 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
73
74 self.log.info("Initializing K8S Helm connector")
75
76 self.config = EnvironConfig()
77 # random numbers for release name generation
78 random.seed(time.time())
79
80 # the file system
81 self.fs = fs
82
83 # exception if kubectl is not installed
84 self.kubectl_command = kubectl_command
85 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
86
87 # exception if helm is not installed
88 self._helm_command = helm_command
89 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
90
91 # obtain stable repo url from config or apply default
92 self._stable_repo_url = self.config.get("stablerepourl")
93 if self._stable_repo_url == "None":
94 self._stable_repo_url = None
95
96 # Lock to avoid concurrent execution of helm commands
97 self.cmd_lock = asyncio.Lock()
98
99 def _get_namespace(self, cluster_uuid: str) -> str:
100 """
101 Obtains the namespace used by the cluster with the uuid passed by argument
102
103 param: cluster_uuid: cluster's uuid
104 """
105
106 # first, obtain the cluster corresponding to the uuid passed by argument
107 k8scluster = self.db.get_one(
108 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
109 )
110 return k8scluster.get("namespace")
111
112 async def init_env(
113 self,
114 k8s_creds: str,
115 namespace: str = "kube-system",
116 reuse_cluster_uuid=None,
117 **kwargs,
118 ) -> tuple[str, bool]:
119 """
120 It prepares a given K8s cluster environment to run Charts
121
122 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
123 '.kube/config'
124 :param namespace: optional namespace to be used for helm. By default,
125 'kube-system' will be used
126 :param reuse_cluster_uuid: existing cluster uuid for reuse
127 :param kwargs: Additional parameters (None yet)
128 :return: uuid of the K8s cluster and True if connector has installed some
129 software in the cluster
130 (on error, an exception will be raised)
131 """
132
133 if reuse_cluster_uuid:
134 cluster_id = reuse_cluster_uuid
135 else:
136 cluster_id = str(uuid4())
137
138 self.log.debug(
139 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
140 )
141
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_id, create_if_not_exist=True
144 )
145 mode = stat.S_IRUSR | stat.S_IWUSR
146 with open(paths["kube_config"], "w", mode) as f:
147 f.write(k8s_creds)
148 os.chmod(paths["kube_config"], 0o600)
149
150 # Code with initialization specific of helm version
151 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
152
153 # sync fs with local data
154 self.fs.reverse_sync(from_path=cluster_id)
155
156 self.log.info("Cluster {} initialized".format(cluster_id))
157
158 return cluster_id, n2vc_installed_sw
159
160 async def repo_add(
161 self,
162 cluster_uuid: str,
163 name: str,
164 url: str,
165 repo_type: str = "chart",
166 cert: str = None,
167 user: str = None,
168 password: str = None,
169 oci: bool = False,
170 ):
171 self.log.debug(
172 "Cluster {}, adding {} repository {}. URL: {}".format(
173 cluster_uuid, repo_type, name, url
174 )
175 )
176
177 # init_env
178 paths, env = self._init_paths_env(
179 cluster_name=cluster_uuid, create_if_not_exist=True
180 )
181
182 # sync local dir
183 self.fs.sync(from_path=cluster_uuid)
184
185 if oci:
186 if user and password:
187 host_port = urlparse(url).netloc if url.startswith("oci://") else url
188 # helm registry login url
189 command = "env KUBECONFIG={} {} registry login {}".format(
190 paths["kube_config"], self._helm_command, quote(host_port)
191 )
192 else:
193 self.log.debug(
194 "OCI registry login is not needed for repo: {}".format(name)
195 )
196 return
197 else:
198 # helm repo add name url
199 command = "env KUBECONFIG={} {} repo add {} {}".format(
200 paths["kube_config"], self._helm_command, quote(name), quote(url)
201 )
202
203 if cert:
204 temp_cert_file = os.path.join(
205 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
206 )
207 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
208 with open(temp_cert_file, "w") as the_cert:
209 the_cert.write(cert)
210 command += " --ca-file {}".format(quote(temp_cert_file))
211
212 if user:
213 command += " --username={}".format(quote(user))
214
215 if password:
216 command += " --password={}".format(quote(password))
217
218 self.log.debug("adding repo: {}".format(command))
219 await self._local_async_exec(
220 command=command, raise_exception_on_error=True, env=env
221 )
222
223 if not oci:
224 # helm repo update
225 command = "env KUBECONFIG={} {} repo update {}".format(
226 paths["kube_config"], self._helm_command, quote(name)
227 )
228 self.log.debug("updating repo: {}".format(command))
229 await self._local_async_exec(
230 command=command, raise_exception_on_error=False, env=env
231 )
232
233 # sync fs
234 self.fs.reverse_sync(from_path=cluster_uuid)
235
236 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
237 self.log.debug(
238 "Cluster {}, updating {} repository {}".format(
239 cluster_uuid, repo_type, name
240 )
241 )
242
243 # init_env
244 paths, env = self._init_paths_env(
245 cluster_name=cluster_uuid, create_if_not_exist=True
246 )
247
248 # sync local dir
249 self.fs.sync(from_path=cluster_uuid)
250
251 # helm repo update
252 command = "{} repo update {}".format(self._helm_command, quote(name))
253 self.log.debug("updating repo: {}".format(command))
254 await self._local_async_exec(
255 command=command, raise_exception_on_error=False, env=env
256 )
257
258 # sync fs
259 self.fs.reverse_sync(from_path=cluster_uuid)
260
261 async def repo_list(self, cluster_uuid: str) -> list:
262 """
263 Get the list of registered repositories
264
265 :return: list of registered repositories: [ (name, url) .... ]
266 """
267
268 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
269
270 # config filename
271 paths, env = self._init_paths_env(
272 cluster_name=cluster_uuid, create_if_not_exist=True
273 )
274
275 # sync local dir
276 self.fs.sync(from_path=cluster_uuid)
277
278 command = "env KUBECONFIG={} {} repo list --output yaml".format(
279 paths["kube_config"], self._helm_command
280 )
281
282 # Set exception to false because if there are no repos just want an empty list
283 output, _rc = await self._local_async_exec(
284 command=command, raise_exception_on_error=False, env=env
285 )
286
287 # sync fs
288 self.fs.reverse_sync(from_path=cluster_uuid)
289
290 if _rc == 0:
291 if output and len(output) > 0:
292 repos = yaml.load(output, Loader=yaml.SafeLoader)
293 # unify format between helm2 and helm3 setting all keys lowercase
294 return self._lower_keys_list(repos)
295 else:
296 return []
297 else:
298 return []
299
300 async def repo_remove(self, cluster_uuid: str, name: str):
301 self.log.debug(
302 "remove {} repositories for cluster {}".format(name, cluster_uuid)
303 )
304
305 # init env, paths
306 paths, env = self._init_paths_env(
307 cluster_name=cluster_uuid, create_if_not_exist=True
308 )
309
310 # sync local dir
311 self.fs.sync(from_path=cluster_uuid)
312
313 command = "env KUBECONFIG={} {} repo remove {}".format(
314 paths["kube_config"], self._helm_command, quote(name)
315 )
316 await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
319
320 # sync fs
321 self.fs.reverse_sync(from_path=cluster_uuid)
322
323 async def reset(
324 self,
325 cluster_uuid: str,
326 force: bool = False,
327 uninstall_sw: bool = False,
328 **kwargs,
329 ) -> bool:
330 """Reset a cluster
331
332 Resets the Kubernetes cluster by removing the helm deployment that represents it.
333
334 :param cluster_uuid: The UUID of the cluster to reset
335 :param force: Boolean to force the reset
336 :param uninstall_sw: Boolean to force the reset
337 :param kwargs: Additional parameters (None yet)
338 :return: Returns True if successful or raises an exception.
339 """
340 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
341 self.log.debug(
342 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343 cluster_uuid, uninstall_sw
344 )
345 )
346
347 # sync local dir
348 self.fs.sync(from_path=cluster_uuid)
349
350 # uninstall releases if needed.
351 if uninstall_sw:
352 releases = await self.instances_list(cluster_uuid=cluster_uuid)
353 if len(releases) > 0:
354 if force:
355 for r in releases:
356 try:
357 kdu_instance = r.get("name")
358 chart = r.get("chart")
359 self.log.debug(
360 "Uninstalling {} -> {}".format(chart, kdu_instance)
361 )
362 await self.uninstall(
363 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
364 )
365 except Exception as e:
366 # will not raise exception as it was found
367 # that in some cases of previously installed helm releases it
368 # raised an error
369 self.log.warn(
370 "Error uninstalling release {}: {}".format(
371 kdu_instance, e
372 )
373 )
374 else:
375 msg = (
376 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377 ).format(cluster_uuid)
378 self.log.warn(msg)
379 uninstall_sw = (
380 False # Allow to remove k8s cluster without removing Tiller
381 )
382
383 if uninstall_sw:
384 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
385
386 # delete cluster directory
387 self.log.debug("Removing directory {}".format(cluster_uuid))
388 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
389 # Remove also local directorio if still exist
390 direct = self.fs.path + "/" + cluster_uuid
391 shutil.rmtree(direct, ignore_errors=True)
392
393 return True
394
395 def _is_helm_chart_a_file(self, chart_name: str):
396 return chart_name.count("/") > 1
397
398 @staticmethod
399 def _is_helm_chart_a_url(chart_name: str):
400 result = urlparse(chart_name)
401 return all([result.scheme, result.netloc])
402
403 async def _install_impl(
404 self,
405 cluster_id: str,
406 kdu_model: str,
407 paths: dict,
408 env: dict,
409 kdu_instance: str,
410 atomic: bool = True,
411 timeout: float = 300,
412 params: dict = None,
413 db_dict: dict = None,
414 kdu_name: str = None,
415 namespace: str = None,
416 ):
417 # init env, paths
418 paths, env = self._init_paths_env(
419 cluster_name=cluster_id, create_if_not_exist=True
420 )
421
422 # params to str
423 params_str, file_to_delete = self._params_to_file_option(
424 cluster_id=cluster_id, params=params
425 )
426
427 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
428
429 command = self._get_install_command(
430 kdu_model,
431 kdu_instance,
432 namespace,
433 params_str,
434 version,
435 atomic,
436 timeout,
437 paths["kube_config"],
438 )
439
440 self.log.debug("installing: {}".format(command))
441
442 if atomic:
443 # exec helm in a task
444 exec_task = asyncio.ensure_future(
445 coro_or_future=self._local_async_exec(
446 command=command, raise_exception_on_error=False, env=env
447 )
448 )
449
450 # write status in another task
451 status_task = asyncio.ensure_future(
452 coro_or_future=self._store_status(
453 cluster_id=cluster_id,
454 kdu_instance=kdu_instance,
455 namespace=namespace,
456 db_dict=db_dict,
457 operation="install",
458 )
459 )
460
461 # wait for execution task
462 await asyncio.wait([exec_task])
463
464 # cancel status task
465 status_task.cancel()
466
467 output, rc = exec_task.result()
468
469 else:
470 output, rc = await self._local_async_exec(
471 command=command, raise_exception_on_error=False, env=env
472 )
473
474 # remove temporal values yaml file
475 if file_to_delete:
476 os.remove(file_to_delete)
477
478 # write final status
479 await self._store_status(
480 cluster_id=cluster_id,
481 kdu_instance=kdu_instance,
482 namespace=namespace,
483 db_dict=db_dict,
484 operation="install",
485 )
486
487 if rc != 0:
488 msg = "Error executing command: {}\nOutput: {}".format(command, output)
489 self.log.error(msg)
490 raise K8sException(msg)
491
492 async def upgrade(
493 self,
494 cluster_uuid: str,
495 kdu_instance: str,
496 kdu_model: str = None,
497 atomic: bool = True,
498 timeout: float = 300,
499 params: dict = None,
500 db_dict: dict = None,
501 namespace: str = None,
502 force: bool = False,
503 ):
504 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
505
506 # sync local dir
507 self.fs.sync(from_path=cluster_uuid)
508
509 # look for instance to obtain namespace
510
511 # set namespace
512 if not namespace:
513 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
514 if not instance_info:
515 raise K8sException("kdu_instance {} not found".format(kdu_instance))
516 namespace = instance_info["namespace"]
517
518 # init env, paths
519 paths, env = self._init_paths_env(
520 cluster_name=cluster_uuid, create_if_not_exist=True
521 )
522
523 # sync local dir
524 self.fs.sync(from_path=cluster_uuid)
525
526 # params to str
527 params_str, file_to_delete = self._params_to_file_option(
528 cluster_id=cluster_uuid, params=params
529 )
530
531 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
532
533 command = self._get_upgrade_command(
534 kdu_model,
535 kdu_instance,
536 namespace,
537 params_str,
538 version,
539 atomic,
540 timeout,
541 paths["kube_config"],
542 force,
543 )
544
545 self.log.debug("upgrading: {}".format(command))
546
547 if atomic:
548 # exec helm in a task
549 exec_task = asyncio.ensure_future(
550 coro_or_future=self._local_async_exec(
551 command=command, raise_exception_on_error=False, env=env
552 )
553 )
554 # write status in another task
555 status_task = asyncio.ensure_future(
556 coro_or_future=self._store_status(
557 cluster_id=cluster_uuid,
558 kdu_instance=kdu_instance,
559 namespace=namespace,
560 db_dict=db_dict,
561 operation="upgrade",
562 )
563 )
564
565 # wait for execution task
566 await asyncio.wait([exec_task])
567
568 # cancel status task
569 status_task.cancel()
570 output, rc = exec_task.result()
571
572 else:
573 output, rc = await self._local_async_exec(
574 command=command, raise_exception_on_error=False, env=env
575 )
576
577 # remove temporal values yaml file
578 if file_to_delete:
579 os.remove(file_to_delete)
580
581 # write final status
582 await self._store_status(
583 cluster_id=cluster_uuid,
584 kdu_instance=kdu_instance,
585 namespace=namespace,
586 db_dict=db_dict,
587 operation="upgrade",
588 )
589
590 if rc != 0:
591 msg = "Error executing command: {}\nOutput: {}".format(command, output)
592 self.log.error(msg)
593 raise K8sException(msg)
594
595 # sync fs
596 self.fs.reverse_sync(from_path=cluster_uuid)
597
598 # return new revision number
599 instance = await self.get_instance_info(
600 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
601 )
602 if instance:
603 revision = int(instance.get("revision"))
604 self.log.debug("New revision: {}".format(revision))
605 return revision
606 else:
607 return 0
608
609 async def scale(
610 self,
611 kdu_instance: str,
612 scale: int,
613 resource_name: str,
614 total_timeout: float = 1800,
615 cluster_uuid: str = None,
616 kdu_model: str = None,
617 atomic: bool = True,
618 db_dict: dict = None,
619 **kwargs,
620 ):
621 """Scale a resource in a Helm Chart.
622
623 Args:
624 kdu_instance: KDU instance name
625 scale: Scale to which to set the resource
626 resource_name: Resource name
627 total_timeout: The time, in seconds, to wait
628 cluster_uuid: The UUID of the cluster
629 kdu_model: The chart reference
630 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
631 The --wait flag will be set automatically if --atomic is used
632 db_dict: Dictionary for any additional data
633 kwargs: Additional parameters
634
635 Returns:
636 True if successful, False otherwise
637 """
638
639 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
640 if resource_name:
641 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
642 resource_name, kdu_model, cluster_uuid
643 )
644
645 self.log.debug(debug_mgs)
646
647 # look for instance to obtain namespace
648 # get_instance_info function calls the sync command
649 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
650 if not instance_info:
651 raise K8sException("kdu_instance {} not found".format(kdu_instance))
652
653 # init env, paths
654 paths, env = self._init_paths_env(
655 cluster_name=cluster_uuid, create_if_not_exist=True
656 )
657
658 # version
659 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
660
661 repo_url = await self._find_repo(kdu_model, cluster_uuid)
662
663 _, replica_str = await self._get_replica_count_url(
664 kdu_model, repo_url, resource_name
665 )
666
667 command = self._get_upgrade_scale_command(
668 kdu_model,
669 kdu_instance,
670 instance_info["namespace"],
671 scale,
672 version,
673 atomic,
674 replica_str,
675 total_timeout,
676 resource_name,
677 paths["kube_config"],
678 )
679
680 self.log.debug("scaling: {}".format(command))
681
682 if atomic:
683 # exec helm in a task
684 exec_task = asyncio.ensure_future(
685 coro_or_future=self._local_async_exec(
686 command=command, raise_exception_on_error=False, env=env
687 )
688 )
689 # write status in another task
690 status_task = asyncio.ensure_future(
691 coro_or_future=self._store_status(
692 cluster_id=cluster_uuid,
693 kdu_instance=kdu_instance,
694 namespace=instance_info["namespace"],
695 db_dict=db_dict,
696 operation="scale",
697 )
698 )
699
700 # wait for execution task
701 await asyncio.wait([exec_task])
702
703 # cancel status task
704 status_task.cancel()
705 output, rc = exec_task.result()
706
707 else:
708 output, rc = await self._local_async_exec(
709 command=command, raise_exception_on_error=False, env=env
710 )
711
712 # write final status
713 await self._store_status(
714 cluster_id=cluster_uuid,
715 kdu_instance=kdu_instance,
716 namespace=instance_info["namespace"],
717 db_dict=db_dict,
718 operation="scale",
719 )
720
721 if rc != 0:
722 msg = "Error executing command: {}\nOutput: {}".format(command, output)
723 self.log.error(msg)
724 raise K8sException(msg)
725
726 # sync fs
727 self.fs.reverse_sync(from_path=cluster_uuid)
728
729 return True
730
731 async def get_scale_count(
732 self,
733 resource_name: str,
734 kdu_instance: str,
735 cluster_uuid: str,
736 kdu_model: str,
737 **kwargs,
738 ) -> int:
739 """Get a resource scale count.
740
741 Args:
742 cluster_uuid: The UUID of the cluster
743 resource_name: Resource name
744 kdu_instance: KDU instance name
745 kdu_model: The name or path of an Helm Chart
746 kwargs: Additional parameters
747
748 Returns:
749 Resource instance count
750 """
751
752 self.log.debug(
753 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
754 )
755
756 # look for instance to obtain namespace
757 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
758 if not instance_info:
759 raise K8sException("kdu_instance {} not found".format(kdu_instance))
760
761 # init env, paths
762 paths, _ = self._init_paths_env(
763 cluster_name=cluster_uuid, create_if_not_exist=True
764 )
765
766 replicas = await self._get_replica_count_instance(
767 kdu_instance=kdu_instance,
768 namespace=instance_info["namespace"],
769 kubeconfig=paths["kube_config"],
770 resource_name=resource_name,
771 )
772
773 self.log.debug(
774 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
775 )
776
777 # Get default value if scale count is not found from provided values
778 # Important note: this piece of code shall only be executed in the first scaling operation,
779 # since it is expected that the _get_replica_count_instance is able to obtain the number of
780 # replicas when a scale operation was already conducted previously for this KDU/resource!
781 if replicas is None:
782 repo_url = await self._find_repo(
783 kdu_model=kdu_model, cluster_uuid=cluster_uuid
784 )
785 replicas, _ = await self._get_replica_count_url(
786 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
787 )
788
789 self.log.debug(
790 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
791 f"{resource_name} obtained: {replicas}"
792 )
793
794 if replicas is None:
795 msg = "Replica count not found. Cannot be scaled"
796 self.log.error(msg)
797 raise K8sException(msg)
798
799 return int(replicas)
800
801 async def rollback(
802 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
803 ):
804 self.log.debug(
805 "rollback kdu_instance {} to revision {} from cluster {}".format(
806 kdu_instance, revision, cluster_uuid
807 )
808 )
809
810 # sync local dir
811 self.fs.sync(from_path=cluster_uuid)
812
813 # look for instance to obtain namespace
814 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
815 if not instance_info:
816 raise K8sException("kdu_instance {} not found".format(kdu_instance))
817
818 # init env, paths
819 paths, env = self._init_paths_env(
820 cluster_name=cluster_uuid, create_if_not_exist=True
821 )
822
823 # sync local dir
824 self.fs.sync(from_path=cluster_uuid)
825
826 command = self._get_rollback_command(
827 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
828 )
829
830 self.log.debug("rolling_back: {}".format(command))
831
832 # exec helm in a task
833 exec_task = asyncio.ensure_future(
834 coro_or_future=self._local_async_exec(
835 command=command, raise_exception_on_error=False, env=env
836 )
837 )
838 # write status in another task
839 status_task = asyncio.ensure_future(
840 coro_or_future=self._store_status(
841 cluster_id=cluster_uuid,
842 kdu_instance=kdu_instance,
843 namespace=instance_info["namespace"],
844 db_dict=db_dict,
845 operation="rollback",
846 )
847 )
848
849 # wait for execution task
850 await asyncio.wait([exec_task])
851
852 # cancel status task
853 status_task.cancel()
854
855 output, rc = exec_task.result()
856
857 # write final status
858 await self._store_status(
859 cluster_id=cluster_uuid,
860 kdu_instance=kdu_instance,
861 namespace=instance_info["namespace"],
862 db_dict=db_dict,
863 operation="rollback",
864 )
865
866 if rc != 0:
867 msg = "Error executing command: {}\nOutput: {}".format(command, output)
868 self.log.error(msg)
869 raise K8sException(msg)
870
871 # sync fs
872 self.fs.reverse_sync(from_path=cluster_uuid)
873
874 # return new revision number
875 instance = await self.get_instance_info(
876 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
877 )
878 if instance:
879 revision = int(instance.get("revision"))
880 self.log.debug("New revision: {}".format(revision))
881 return revision
882 else:
883 return 0
884
885 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
886 """
887 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
888 (this call should happen after all _terminate-config-primitive_ of the VNF
889 are invoked).
890
891 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
892 :param kdu_instance: unique name for the KDU instance to be deleted
893 :param kwargs: Additional parameters (None yet)
894 :return: True if successful
895 """
896
897 self.log.debug(
898 "uninstall kdu_instance {} from cluster {}".format(
899 kdu_instance, cluster_uuid
900 )
901 )
902
903 # sync local dir
904 self.fs.sync(from_path=cluster_uuid)
905
906 # look for instance to obtain namespace
907 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
908 if not instance_info:
909 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
910 return True
911 # init env, paths
912 paths, env = self._init_paths_env(
913 cluster_name=cluster_uuid, create_if_not_exist=True
914 )
915
916 # sync local dir
917 self.fs.sync(from_path=cluster_uuid)
918
919 command = self._get_uninstall_command(
920 kdu_instance, instance_info["namespace"], paths["kube_config"]
921 )
922 output, _rc = await self._local_async_exec(
923 command=command, raise_exception_on_error=True, env=env
924 )
925
926 # sync fs
927 self.fs.reverse_sync(from_path=cluster_uuid)
928
929 return self._output_to_table(output)
930
931 async def instances_list(self, cluster_uuid: str) -> list:
932 """
933 returns a list of deployed releases in a cluster
934
935 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
936 :return:
937 """
938
939 self.log.debug("list releases for cluster {}".format(cluster_uuid))
940
941 # sync local dir
942 self.fs.sync(from_path=cluster_uuid)
943
944 # execute internal command
945 result = await self._instances_list(cluster_uuid)
946
947 # sync fs
948 self.fs.reverse_sync(from_path=cluster_uuid)
949
950 return result
951
952 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
953 instances = await self.instances_list(cluster_uuid=cluster_uuid)
954 for instance in instances:
955 if instance.get("name") == kdu_instance:
956 return instance
957 self.log.debug("Instance {} not found".format(kdu_instance))
958 return None
959
960 async def upgrade_charm(
961 self,
962 ee_id: str = None,
963 path: str = None,
964 charm_id: str = None,
965 charm_type: str = None,
966 timeout: float = None,
967 ) -> str:
968 """This method upgrade charms in VNFs
969
970 Args:
971 ee_id: Execution environment id
972 path: Local path to the charm
973 charm_id: charm-id
974 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
975 timeout: (Float) Timeout for the ns update operation
976
977 Returns:
978 The output of the update operation if status equals to "completed"
979 """
980 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
981
982 async def exec_primitive(
983 self,
984 cluster_uuid: str = None,
985 kdu_instance: str = None,
986 primitive_name: str = None,
987 timeout: float = 300,
988 params: dict = None,
989 db_dict: dict = None,
990 **kwargs,
991 ) -> str:
992 """Exec primitive (Juju action)
993
994 :param cluster_uuid: The UUID of the cluster or namespace:cluster
995 :param kdu_instance: The unique name of the KDU instance
996 :param primitive_name: Name of action that will be executed
997 :param timeout: Timeout for action execution
998 :param params: Dictionary of all the parameters needed for the action
999 :db_dict: Dictionary for any additional data
1000 :param kwargs: Additional parameters (None yet)
1001
1002 :return: Returns the output of the action
1003 """
1004 raise K8sException(
1005 "KDUs deployed with Helm don't support actions "
1006 "different from rollback, upgrade and status"
1007 )
1008
1009 async def get_services(
1010 self, cluster_uuid: str, kdu_instance: str, namespace: str
1011 ) -> list:
1012 """
1013 Returns a list of services defined for the specified kdu instance.
1014
1015 :param cluster_uuid: UUID of a K8s cluster known by OSM
1016 :param kdu_instance: unique name for the KDU instance
1017 :param namespace: K8s namespace used by the KDU instance
1018 :return: If successful, it will return a list of services, Each service
1019 can have the following data:
1020 - `name` of the service
1021 - `type` type of service in the k8 cluster
1022 - `ports` List of ports offered by the service, for each port includes at least
1023 name, port, protocol
1024 - `cluster_ip` Internal ip to be used inside k8s cluster
1025 - `external_ip` List of external ips (in case they are available)
1026 """
1027
1028 self.log.debug(
1029 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1030 cluster_uuid, kdu_instance
1031 )
1032 )
1033
1034 # init env, paths
1035 paths, env = self._init_paths_env(
1036 cluster_name=cluster_uuid, create_if_not_exist=True
1037 )
1038
1039 # sync local dir
1040 self.fs.sync(from_path=cluster_uuid)
1041
1042 # get list of services names for kdu
1043 service_names = await self._get_services(
1044 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1045 )
1046
1047 service_list = []
1048 for service in service_names:
1049 service = await self._get_service(cluster_uuid, service, namespace)
1050 service_list.append(service)
1051
1052 # sync fs
1053 self.fs.reverse_sync(from_path=cluster_uuid)
1054
1055 return service_list
1056
1057 async def get_service(
1058 self, cluster_uuid: str, service_name: str, namespace: str
1059 ) -> object:
1060 self.log.debug(
1061 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1062 service_name, namespace, cluster_uuid
1063 )
1064 )
1065
1066 # sync local dir
1067 self.fs.sync(from_path=cluster_uuid)
1068
1069 service = await self._get_service(cluster_uuid, service_name, namespace)
1070
1071 # sync fs
1072 self.fs.reverse_sync(from_path=cluster_uuid)
1073
1074 return service
1075
1076 async def status_kdu(
1077 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1078 ) -> Union[str, dict]:
1079 """
1080 This call would retrieve tha current state of a given KDU instance. It would be
1081 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1082 values_ of the configuration parameters applied to a given instance. This call
1083 would be based on the `status` call.
1084
1085 :param cluster_uuid: UUID of a K8s cluster known by OSM
1086 :param kdu_instance: unique name for the KDU instance
1087 :param kwargs: Additional parameters (None yet)
1088 :param yaml_format: if the return shall be returned as an YAML string or as a
1089 dictionary
1090 :return: If successful, it will return the following vector of arguments:
1091 - K8s `namespace` in the cluster where the KDU lives
1092 - `state` of the KDU instance. It can be:
1093 - UNKNOWN
1094 - DEPLOYED
1095 - DELETED
1096 - SUPERSEDED
1097 - FAILED or
1098 - DELETING
1099 - List of `resources` (objects) that this release consists of, sorted by kind,
1100 and the status of those resources
1101 - Last `deployment_time`.
1102
1103 """
1104 self.log.debug(
1105 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1106 cluster_uuid, kdu_instance
1107 )
1108 )
1109
1110 # sync local dir
1111 self.fs.sync(from_path=cluster_uuid)
1112
1113 # get instance: needed to obtain namespace
1114 instances = await self._instances_list(cluster_id=cluster_uuid)
1115 for instance in instances:
1116 if instance.get("name") == kdu_instance:
1117 break
1118 else:
1119 # instance does not exist
1120 raise K8sException(
1121 "Instance name: {} not found in cluster: {}".format(
1122 kdu_instance, cluster_uuid
1123 )
1124 )
1125
1126 status = await self._status_kdu(
1127 cluster_id=cluster_uuid,
1128 kdu_instance=kdu_instance,
1129 namespace=instance["namespace"],
1130 yaml_format=yaml_format,
1131 show_error_log=True,
1132 )
1133
1134 # sync fs
1135 self.fs.reverse_sync(from_path=cluster_uuid)
1136
1137 return status
1138
1139 async def get_values_kdu(
1140 self, kdu_instance: str, namespace: str, kubeconfig: str
1141 ) -> str:
1142 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1143
1144 return await self._exec_get_command(
1145 get_command="values",
1146 kdu_instance=kdu_instance,
1147 namespace=namespace,
1148 kubeconfig=kubeconfig,
1149 )
1150
1151 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1152 """Method to obtain the Helm Chart package's values
1153
1154 Args:
1155 kdu_model: The name or path of an Helm Chart
1156 repo_url: Helm Chart repository url
1157
1158 Returns:
1159 str: the values of the Helm Chart package
1160 """
1161
1162 self.log.debug(
1163 "inspect kdu_model values {} from (optional) repo: {}".format(
1164 kdu_model, repo_url
1165 )
1166 )
1167
1168 return await self._exec_inspect_command(
1169 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1170 )
1171
1172 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1173 self.log.debug(
1174 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1175 )
1176
1177 return await self._exec_inspect_command(
1178 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1179 )
1180
1181 async def synchronize_repos(self, cluster_uuid: str):
1182 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1183 try:
1184 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1185 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1186
1187 local_repo_list = await self.repo_list(cluster_uuid)
1188 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1189
1190 deleted_repo_list = []
1191 added_repo_dict = {}
1192
1193 # iterate over the list of repos in the database that should be
1194 # added if not present
1195 for repo_name, db_repo in db_repo_dict.items():
1196 try:
1197 # check if it is already present
1198 curr_repo_url = local_repo_dict.get(db_repo["name"])
1199 repo_id = db_repo.get("_id")
1200 if curr_repo_url != db_repo["url"]:
1201 if curr_repo_url:
1202 self.log.debug(
1203 "repo {} url changed, delete and and again".format(
1204 db_repo["url"]
1205 )
1206 )
1207 await self.repo_remove(cluster_uuid, db_repo["name"])
1208 deleted_repo_list.append(repo_id)
1209
1210 # add repo
1211 self.log.debug("add repo {}".format(db_repo["name"]))
1212 await self.repo_add(
1213 cluster_uuid,
1214 db_repo["name"],
1215 db_repo["url"],
1216 cert=db_repo.get("ca_cert"),
1217 user=db_repo.get("user"),
1218 password=db_repo.get("password"),
1219 oci=db_repo.get("oci", False),
1220 )
1221 added_repo_dict[repo_id] = db_repo["name"]
1222 except Exception as e:
1223 raise K8sException(
1224 "Error adding repo id: {}, err_msg: {} ".format(
1225 repo_id, repr(e)
1226 )
1227 )
1228
1229 # Delete repos that are present but not in nbi_list
1230 for repo_name in local_repo_dict:
1231 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1232 self.log.debug("delete repo {}".format(repo_name))
1233 try:
1234 await self.repo_remove(cluster_uuid, repo_name)
1235 deleted_repo_list.append(repo_name)
1236 except Exception as e:
1237 self.warning(
1238 "Error deleting repo, name: {}, err_msg: {}".format(
1239 repo_name, str(e)
1240 )
1241 )
1242
1243 return deleted_repo_list, added_repo_dict
1244
1245 except K8sException:
1246 raise
1247 except Exception as e:
1248 # Do not raise errors synchronizing repos
1249 self.log.error("Error synchronizing repos: {}".format(e))
1250 raise Exception("Error synchronizing repos: {}".format(e))
1251
1252 def _get_db_repos_dict(self, repo_ids: list):
1253 db_repos_dict = {}
1254 for repo_id in repo_ids:
1255 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1256 db_repos_dict[db_repo["name"]] = db_repo
1257 return db_repos_dict
1258
1259 """
1260 ####################################################################################
1261 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1262 ####################################################################################
1263 """
1264
1265 @abc.abstractmethod
1266 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1267 """
1268 Creates and returns base cluster and kube dirs and returns them.
1269 Also created helm3 dirs according to new directory specification, paths are
1270 not returned but assigned to helm environment variables
1271
1272 :param cluster_name: cluster_name
1273 :return: Dictionary with config_paths and dictionary with helm environment variables
1274 """
1275
1276 @abc.abstractmethod
1277 async def _cluster_init(self, cluster_id, namespace, paths, env):
1278 """
1279 Implements the helm version dependent cluster initialization
1280 """
1281
1282 @abc.abstractmethod
1283 async def _instances_list(self, cluster_id):
1284 """
1285 Implements the helm version dependent helm instances list
1286 """
1287
1288 @abc.abstractmethod
1289 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1290 """
1291 Implements the helm version dependent method to obtain services from a helm instance
1292 """
1293
1294 @abc.abstractmethod
1295 async def _status_kdu(
1296 self,
1297 cluster_id: str,
1298 kdu_instance: str,
1299 namespace: str = None,
1300 yaml_format: bool = False,
1301 show_error_log: bool = False,
1302 ) -> Union[str, dict]:
1303 """
1304 Implements the helm version dependent method to obtain status of a helm instance
1305 """
1306
1307 @abc.abstractmethod
1308 def _get_install_command(
1309 self,
1310 kdu_model,
1311 kdu_instance,
1312 namespace,
1313 params_str,
1314 version,
1315 atomic,
1316 timeout,
1317 kubeconfig,
1318 ) -> str:
1319 """
1320 Obtain command to be executed to delete the indicated instance
1321 """
1322
1323 @abc.abstractmethod
1324 def _get_upgrade_scale_command(
1325 self,
1326 kdu_model,
1327 kdu_instance,
1328 namespace,
1329 count,
1330 version,
1331 atomic,
1332 replicas,
1333 timeout,
1334 resource_name,
1335 kubeconfig,
1336 ) -> str:
1337 """Generates the command to scale a Helm Chart release
1338
1339 Args:
1340 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1341 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1342 namespace (str): Namespace where this KDU instance is deployed
1343 scale (int): Scale count
1344 version (str): Constraint with specific version of the Chart to use
1345 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1346 The --wait flag will be set automatically if --atomic is used
1347 replica_str (str): The key under resource_name key where the scale count is stored
1348 timeout (float): The time, in seconds, to wait
1349 resource_name (str): The KDU's resource to scale
1350 kubeconfig (str): Kubeconfig file path
1351
1352 Returns:
1353 str: command to scale a Helm Chart release
1354 """
1355
1356 @abc.abstractmethod
1357 def _get_upgrade_command(
1358 self,
1359 kdu_model,
1360 kdu_instance,
1361 namespace,
1362 params_str,
1363 version,
1364 atomic,
1365 timeout,
1366 kubeconfig,
1367 force,
1368 ) -> str:
1369 """Generates the command to upgrade a Helm Chart release
1370
1371 Args:
1372 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1373 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1374 namespace (str): Namespace where this KDU instance is deployed
1375 params_str (str): Params used to upgrade the Helm Chart release
1376 version (str): Constraint with specific version of the Chart to use
1377 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1378 The --wait flag will be set automatically if --atomic is used
1379 timeout (float): The time, in seconds, to wait
1380 kubeconfig (str): Kubeconfig file path
1381 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1382 Returns:
1383 str: command to upgrade a Helm Chart release
1384 """
1385
1386 @abc.abstractmethod
1387 def _get_rollback_command(
1388 self, kdu_instance, namespace, revision, kubeconfig
1389 ) -> str:
1390 """
1391 Obtain command to be executed to rollback the indicated instance
1392 """
1393
1394 @abc.abstractmethod
1395 def _get_uninstall_command(
1396 self, kdu_instance: str, namespace: str, kubeconfig: str
1397 ) -> str:
1398 """
1399 Obtain command to be executed to delete the indicated instance
1400 """
1401
1402 @abc.abstractmethod
1403 def _get_inspect_command(
1404 self, show_command: str, kdu_model: str, repo_str: str, version: str
1405 ):
1406 """Generates the command to obtain the information about an Helm Chart package
1407 (´helm show ...´ command)
1408
1409 Args:
1410 show_command: the second part of the command (`helm show <show_command>`)
1411 kdu_model: The name or path of an Helm Chart
1412 repo_url: Helm Chart repository url
1413 version: constraint with specific version of the Chart to use
1414
1415 Returns:
1416 str: the generated Helm Chart command
1417 """
1418
1419 @abc.abstractmethod
1420 def _get_get_command(
1421 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1422 ):
1423 """Obtain command to be executed to get information about the kdu instance."""
1424
1425 @abc.abstractmethod
1426 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1427 """
1428 Method call to uninstall cluster software for helm. This method is dependent
1429 of helm version
1430 For Helm v2 it will be called when Tiller must be uninstalled
1431 For Helm v3 it does nothing and does not need to be callled
1432 """
1433
1434 @abc.abstractmethod
1435 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1436 """
1437 Obtains the cluster repos identifiers
1438 """
1439
1440 """
1441 ####################################################################################
1442 ################################### P R I V A T E ##################################
1443 ####################################################################################
1444 """
1445
1446 @staticmethod
1447 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1448 if os.path.exists(filename):
1449 return True
1450 else:
1451 msg = "File {} does not exist".format(filename)
1452 if exception_if_not_exists:
1453 raise K8sException(msg)
1454
1455 @staticmethod
1456 def _remove_multiple_spaces(strobj):
1457 strobj = strobj.strip()
1458 while " " in strobj:
1459 strobj = strobj.replace(" ", " ")
1460 return strobj
1461
1462 @staticmethod
1463 def _output_to_lines(output: str) -> list:
1464 output_lines = list()
1465 lines = output.splitlines(keepends=False)
1466 for line in lines:
1467 line = line.strip()
1468 if len(line) > 0:
1469 output_lines.append(line)
1470 return output_lines
1471
1472 @staticmethod
1473 def _output_to_table(output: str) -> list:
1474 output_table = list()
1475 lines = output.splitlines(keepends=False)
1476 for line in lines:
1477 line = line.replace("\t", " ")
1478 line_list = list()
1479 output_table.append(line_list)
1480 cells = line.split(sep=" ")
1481 for cell in cells:
1482 cell = cell.strip()
1483 if len(cell) > 0:
1484 line_list.append(cell)
1485 return output_table
1486
1487 @staticmethod
1488 def _parse_services(output: str) -> list:
1489 lines = output.splitlines(keepends=False)
1490 services = []
1491 for line in lines:
1492 line = line.replace("\t", " ")
1493 cells = line.split(sep=" ")
1494 if len(cells) > 0 and cells[0].startswith("service/"):
1495 elems = cells[0].split(sep="/")
1496 if len(elems) > 1:
1497 services.append(elems[1])
1498 return services
1499
1500 @staticmethod
1501 def _get_deep(dictionary: dict, members: tuple):
1502 target = dictionary
1503 value = None
1504 try:
1505 for m in members:
1506 value = target.get(m)
1507 if not value:
1508 return None
1509 else:
1510 target = value
1511 except Exception:
1512 pass
1513 return value
1514
1515 # find key:value in several lines
1516 @staticmethod
1517 def _find_in_lines(p_lines: list, p_key: str) -> str:
1518 for line in p_lines:
1519 try:
1520 if line.startswith(p_key + ":"):
1521 parts = line.split(":")
1522 the_value = parts[1].strip()
1523 return the_value
1524 except Exception:
1525 # ignore it
1526 pass
1527 return None
1528
1529 @staticmethod
1530 def _lower_keys_list(input_list: list):
1531 """
1532 Transform the keys in a list of dictionaries to lower case and returns a new list
1533 of dictionaries
1534 """
1535 new_list = []
1536 if input_list:
1537 for dictionary in input_list:
1538 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1539 new_list.append(new_dict)
1540 return new_list
1541
1542 async def _local_async_exec(
1543 self,
1544 command: str,
1545 raise_exception_on_error: bool = False,
1546 show_error_log: bool = True,
1547 encode_utf8: bool = False,
1548 env: dict = None,
1549 ) -> tuple[str, int]:
1550 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1551 self.log.debug(
1552 "Executing async local command: {}, env: {}".format(command, env)
1553 )
1554
1555 # split command
1556 command = shlex.split(command)
1557
1558 environ = os.environ.copy()
1559 if env:
1560 environ.update(env)
1561
1562 try:
1563 async with self.cmd_lock:
1564 process = await asyncio.create_subprocess_exec(
1565 *command,
1566 stdout=asyncio.subprocess.PIPE,
1567 stderr=asyncio.subprocess.PIPE,
1568 env=environ,
1569 )
1570
1571 # wait for command terminate
1572 stdout, stderr = await process.communicate()
1573
1574 return_code = process.returncode
1575
1576 output = ""
1577 if stdout:
1578 output = stdout.decode("utf-8").strip()
1579 # output = stdout.decode()
1580 if stderr:
1581 output = stderr.decode("utf-8").strip()
1582 # output = stderr.decode()
1583
1584 if return_code != 0 and show_error_log:
1585 self.log.debug(
1586 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1587 )
1588 else:
1589 self.log.debug("Return code: {}".format(return_code))
1590
1591 if raise_exception_on_error and return_code != 0:
1592 raise K8sException(output)
1593
1594 if encode_utf8:
1595 output = output.encode("utf-8").strip()
1596 output = str(output).replace("\\n", "\n")
1597
1598 return output, return_code
1599
1600 except asyncio.CancelledError:
1601 # first, kill the process if it is still running
1602 if process.returncode is None:
1603 process.kill()
1604 raise
1605 except K8sException:
1606 raise
1607 except Exception as e:
1608 msg = "Exception executing command: {} -> {}".format(command, e)
1609 self.log.error(msg)
1610 if raise_exception_on_error:
1611 raise K8sException(e) from e
1612 else:
1613 return "", -1
1614
1615 async def _local_async_exec_pipe(
1616 self,
1617 command1: str,
1618 command2: str,
1619 raise_exception_on_error: bool = True,
1620 show_error_log: bool = True,
1621 encode_utf8: bool = False,
1622 env: dict = None,
1623 ):
1624 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1625 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1626 command = "{} | {}".format(command1, command2)
1627 self.log.debug(
1628 "Executing async local command: {}, env: {}".format(command, env)
1629 )
1630
1631 # split command
1632 command1 = shlex.split(command1)
1633 command2 = shlex.split(command2)
1634
1635 environ = os.environ.copy()
1636 if env:
1637 environ.update(env)
1638
1639 try:
1640 async with self.cmd_lock:
1641 read, write = os.pipe()
1642 process_1 = await asyncio.create_subprocess_exec(
1643 *command1, stdout=write, env=environ
1644 )
1645 os.close(write)
1646 process_2 = await asyncio.create_subprocess_exec(
1647 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1648 )
1649 os.close(read)
1650 stdout, stderr = await process_2.communicate()
1651
1652 return_code = process_2.returncode
1653
1654 output = ""
1655 if stdout:
1656 output = stdout.decode("utf-8").strip()
1657 # output = stdout.decode()
1658 if stderr:
1659 output = stderr.decode("utf-8").strip()
1660 # output = stderr.decode()
1661
1662 if return_code != 0 and show_error_log:
1663 self.log.debug(
1664 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1665 )
1666 else:
1667 self.log.debug("Return code: {}".format(return_code))
1668
1669 if raise_exception_on_error and return_code != 0:
1670 raise K8sException(output)
1671
1672 if encode_utf8:
1673 output = output.encode("utf-8").strip()
1674 output = str(output).replace("\\n", "\n")
1675
1676 return output, return_code
1677 except asyncio.CancelledError:
1678 # first, kill the processes if they are still running
1679 for process in (process_1, process_2):
1680 if process.returncode is None:
1681 process.kill()
1682 raise
1683 except K8sException:
1684 raise
1685 except Exception as e:
1686 msg = "Exception executing command: {} -> {}".format(command, e)
1687 self.log.error(msg)
1688 if raise_exception_on_error:
1689 raise K8sException(e) from e
1690 else:
1691 return "", -1
1692
1693 async def _get_service(self, cluster_id, service_name, namespace):
1694 """
1695 Obtains the data of the specified service in the k8cluster.
1696
1697 :param cluster_id: id of a K8s cluster known by OSM
1698 :param service_name: name of the K8s service in the specified namespace
1699 :param namespace: K8s namespace used by the KDU instance
1700 :return: If successful, it will return a service with the following data:
1701 - `name` of the service
1702 - `type` type of service in the k8 cluster
1703 - `ports` List of ports offered by the service, for each port includes at least
1704 name, port, protocol
1705 - `cluster_ip` Internal ip to be used inside k8s cluster
1706 - `external_ip` List of external ips (in case they are available)
1707 """
1708
1709 # init config, env
1710 paths, env = self._init_paths_env(
1711 cluster_name=cluster_id, create_if_not_exist=True
1712 )
1713
1714 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1715 self.kubectl_command,
1716 paths["kube_config"],
1717 quote(namespace),
1718 quote(service_name),
1719 )
1720
1721 output, _rc = await self._local_async_exec(
1722 command=command, raise_exception_on_error=True, env=env
1723 )
1724
1725 data = yaml.load(output, Loader=yaml.SafeLoader)
1726
1727 service = {
1728 "name": service_name,
1729 "type": self._get_deep(data, ("spec", "type")),
1730 "ports": self._get_deep(data, ("spec", "ports")),
1731 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1732 }
1733 if service["type"] == "LoadBalancer":
1734 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1735 ip_list = [elem["ip"] for elem in ip_map_list]
1736 service["external_ip"] = ip_list
1737
1738 return service
1739
1740 async def _exec_get_command(
1741 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1742 ):
1743 """Obtains information about the kdu instance."""
1744
1745 full_command = self._get_get_command(
1746 get_command, kdu_instance, namespace, kubeconfig
1747 )
1748
1749 output, _rc = await self._local_async_exec(command=full_command)
1750
1751 return output
1752
1753 async def _exec_inspect_command(
1754 self, inspect_command: str, kdu_model: str, repo_url: str = None
1755 ):
1756 """Obtains information about an Helm Chart package (´helm show´ command)
1757
1758 Args:
1759 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1760 kdu_model: The name or path of an Helm Chart
1761 repo_url: Helm Chart repository url
1762
1763 Returns:
1764 str: the requested info about the Helm Chart package
1765 """
1766
1767 repo_str = ""
1768 if repo_url:
1769 repo_str = " --repo {}".format(quote(repo_url))
1770
1771 # Obtain the Chart's name and store it in the var kdu_model
1772 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1773
1774 kdu_model, version = self._split_version(kdu_model)
1775 if version:
1776 version_str = "--version {}".format(quote(version))
1777 else:
1778 version_str = ""
1779
1780 full_command = self._get_inspect_command(
1781 show_command=inspect_command,
1782 kdu_model=quote(kdu_model),
1783 repo_str=repo_str,
1784 version=version_str,
1785 )
1786
1787 output, _ = await self._local_async_exec(command=full_command)
1788
1789 return output
1790
1791 async def _get_replica_count_url(
1792 self,
1793 kdu_model: str,
1794 repo_url: str = None,
1795 resource_name: str = None,
1796 ) -> tuple[int, str]:
1797 """Get the replica count value in the Helm Chart Values.
1798
1799 Args:
1800 kdu_model: The name or path of an Helm Chart
1801 repo_url: Helm Chart repository url
1802 resource_name: Resource name
1803
1804 Returns:
1805 A tuple with:
1806 - The number of replicas of the specific instance; if not found, returns None; and
1807 - The string corresponding to the replica count key in the Helm values
1808 """
1809
1810 kdu_values = yaml.load(
1811 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1812 Loader=yaml.SafeLoader,
1813 )
1814
1815 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1816
1817 if not kdu_values:
1818 raise K8sException(
1819 "kdu_values not found for kdu_model {}".format(kdu_model)
1820 )
1821
1822 if resource_name:
1823 kdu_values = kdu_values.get(resource_name, None)
1824
1825 if not kdu_values:
1826 msg = "resource {} not found in the values in model {}".format(
1827 resource_name, kdu_model
1828 )
1829 self.log.error(msg)
1830 raise K8sException(msg)
1831
1832 duplicate_check = False
1833
1834 replica_str = ""
1835 replicas = None
1836
1837 if kdu_values.get("replicaCount") is not None:
1838 replicas = kdu_values["replicaCount"]
1839 replica_str = "replicaCount"
1840 elif kdu_values.get("replicas") is not None:
1841 duplicate_check = True
1842 replicas = kdu_values["replicas"]
1843 replica_str = "replicas"
1844 else:
1845 if resource_name:
1846 msg = (
1847 "replicaCount or replicas not found in the resource"
1848 "{} values in model {}. Cannot be scaled".format(
1849 resource_name, kdu_model
1850 )
1851 )
1852 else:
1853 msg = (
1854 "replicaCount or replicas not found in the values"
1855 "in model {}. Cannot be scaled".format(kdu_model)
1856 )
1857 self.log.error(msg)
1858 raise K8sException(msg)
1859
1860 # Control if replicas and replicaCount exists at the same time
1861 msg = "replicaCount and replicas are exists at the same time"
1862 if duplicate_check:
1863 if "replicaCount" in kdu_values:
1864 self.log.error(msg)
1865 raise K8sException(msg)
1866 else:
1867 if "replicas" in kdu_values:
1868 self.log.error(msg)
1869 raise K8sException(msg)
1870
1871 return replicas, replica_str
1872
1873 async def _get_replica_count_instance(
1874 self,
1875 kdu_instance: str,
1876 namespace: str,
1877 kubeconfig: str,
1878 resource_name: str = None,
1879 ) -> int:
1880 """Get the replica count value in the instance.
1881
1882 Args:
1883 kdu_instance: The name of the KDU instance
1884 namespace: KDU instance namespace
1885 kubeconfig:
1886 resource_name: Resource name
1887
1888 Returns:
1889 The number of replicas of the specific instance; if not found, returns None
1890 """
1891
1892 kdu_values = yaml.load(
1893 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1894 Loader=yaml.SafeLoader,
1895 )
1896
1897 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1898
1899 replicas = None
1900
1901 if kdu_values:
1902 resource_values = (
1903 kdu_values.get(resource_name, None) if resource_name else None
1904 )
1905
1906 for replica_str in ("replicaCount", "replicas"):
1907 if resource_values:
1908 replicas = resource_values.get(replica_str)
1909 else:
1910 replicas = kdu_values.get(replica_str)
1911
1912 if replicas is not None:
1913 break
1914
1915 return replicas
1916
1917 async def _store_status(
1918 self,
1919 cluster_id: str,
1920 operation: str,
1921 kdu_instance: str,
1922 namespace: str = None,
1923 db_dict: dict = None,
1924 ) -> None:
1925 """
1926 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1927
1928 :param cluster_id (str): the cluster where the KDU instance is deployed
1929 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1930 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1931 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1932 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1933 values for the keys:
1934 - "collection": The Mongo DB collection to write to
1935 - "filter": The query filter to use in the update process
1936 - "path": The dot separated keys which targets the object to be updated
1937 Defaults to None.
1938 """
1939
1940 try:
1941 detailed_status = await self._status_kdu(
1942 cluster_id=cluster_id,
1943 kdu_instance=kdu_instance,
1944 yaml_format=False,
1945 namespace=namespace,
1946 )
1947
1948 status = detailed_status.get("info").get("description")
1949 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1950
1951 # write status to db
1952 result = await self.write_app_status_to_db(
1953 db_dict=db_dict,
1954 status=str(status),
1955 detailed_status=str(detailed_status),
1956 operation=operation,
1957 )
1958
1959 if not result:
1960 self.log.info("Error writing in database. Task exiting...")
1961
1962 except asyncio.CancelledError as e:
1963 self.log.warning(
1964 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1965 )
1966 except Exception as e:
1967 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1968
1969 # params for use in -f file
1970 # returns values file option and filename (in order to delete it at the end)
1971 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
1972 if params and len(params) > 0:
1973 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1974
1975 def get_random_number():
1976 r = random.SystemRandom().randint(1, 99999999)
1977 s = str(r)
1978 while len(s) < 10:
1979 s = "0" + s
1980 return s
1981
1982 params2 = dict()
1983 for key in params:
1984 value = params.get(key)
1985 if "!!yaml" in str(value):
1986 value = yaml.safe_load(value[7:])
1987 params2[key] = value
1988
1989 values_file = get_random_number() + ".yaml"
1990 with open(values_file, "w") as stream:
1991 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1992
1993 return "-f {}".format(values_file), values_file
1994
1995 return "", None
1996
1997 # params for use in --set option
1998 @staticmethod
1999 def _params_to_set_option(params: dict) -> str:
2000 pairs = [
2001 f"{quote(str(key))}={quote(str(value))}"
2002 for key, value in params.items()
2003 if value is not None
2004 ]
2005 if not pairs:
2006 return ""
2007 return "--set " + ",".join(pairs)
2008
2009 @staticmethod
2010 def generate_kdu_instance_name(**kwargs):
2011 chart_name = kwargs["kdu_model"]
2012 # check embeded chart (file or dir)
2013 if chart_name.startswith("/"):
2014 # extract file or directory name
2015 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2016 # check URL
2017 elif "://" in chart_name:
2018 # extract last portion of URL
2019 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2020
2021 name = ""
2022 for c in chart_name:
2023 if c.isalpha() or c.isnumeric():
2024 name += c
2025 else:
2026 name += "-"
2027 if len(name) > 35:
2028 name = name[0:35]
2029
2030 # if does not start with alpha character, prefix 'a'
2031 if not name[0].isalpha():
2032 name = "a" + name
2033
2034 name += "-"
2035
2036 def get_random_number():
2037 r = random.SystemRandom().randint(1, 99999999)
2038 s = str(r)
2039 s = s.rjust(10, "0")
2040 return s
2041
2042 name = name + get_random_number()
2043 return name.lower()
2044
2045 def _split_version(self, kdu_model: str) -> tuple[str, str]:
2046 version = None
2047 if (
2048 not (
2049 self._is_helm_chart_a_file(kdu_model)
2050 or self._is_helm_chart_a_url(kdu_model)
2051 )
2052 and ":" in kdu_model
2053 ):
2054 parts = kdu_model.split(sep=":")
2055 if len(parts) == 2:
2056 version = str(parts[1])
2057 kdu_model = parts[0]
2058 return kdu_model, version
2059
2060 def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2061 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2062
2063 Args:
2064 kdu_model (str): Associated KDU model
2065
2066 Returns:
2067 (str, str): Tuple with the Chart name in index 0, and the repo name
2068 in index 2; if there was a problem finding them, return None
2069 for both
2070 """
2071
2072 chart_name = None
2073 repo_name = None
2074
2075 idx = kdu_model.find("/")
2076 if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
2077 chart_name = kdu_model[idx + 1 :]
2078 repo_name = kdu_model[:idx]
2079
2080 return chart_name, repo_name
2081
2082 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2083 """Obtain the Helm repository for an Helm Chart
2084
2085 Args:
2086 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2087 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2088
2089 Returns:
2090 str: the repository URL; if Helm Chart is a local one, the function returns None
2091 """
2092
2093 _, repo_name = self._split_repo(kdu_model=kdu_model)
2094
2095 repo_url = None
2096 if repo_name:
2097 # Find repository link
2098 local_repo_list = await self.repo_list(cluster_uuid)
2099 for repo in local_repo_list:
2100 if repo["name"] == repo_name:
2101 repo_url = repo["url"]
2102 break # it is not necessary to continue the loop if the repo link was found...
2103
2104 return repo_url
2105
2106 def _repo_to_oci_url(self, repo):
2107 db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
2108 if db_repo and "oci" in db_repo:
2109 return db_repo.get("url")
2110
2111 async def _prepare_helm_chart(self, kdu_model, cluster_id):
2112 # e.g.: "stable/openldap", "1.0"
2113 kdu_model, version = self._split_version(kdu_model)
2114 # e.g.: "openldap, stable"
2115 chart_name, repo = self._split_repo(kdu_model)
2116 if repo and chart_name: # repo/chart case
2117 oci_url = self._repo_to_oci_url(repo)
2118 if oci_url: # oci does not require helm repo update
2119 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2120 else:
2121 await self.repo_update(cluster_id, repo)
2122 return kdu_model, version
2123
2124 async def create_certificate(
2125 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2126 ):
2127 paths, env = self._init_paths_env(
2128 cluster_name=cluster_uuid, create_if_not_exist=True
2129 )
2130 kubectl = Kubectl(config_file=paths["kube_config"])
2131 await kubectl.create_certificate(
2132 namespace=namespace,
2133 name=name,
2134 dns_prefix=dns_prefix,
2135 secret_name=secret_name,
2136 usages=[usage],
2137 issuer_name="ca-issuer",
2138 )
2139
2140 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2141 paths, env = self._init_paths_env(
2142 cluster_name=cluster_uuid, create_if_not_exist=True
2143 )
2144 kubectl = Kubectl(config_file=paths["kube_config"])
2145 await kubectl.delete_certificate(namespace, certificate_name)
2146
2147 async def create_namespace(
2148 self,
2149 namespace,
2150 cluster_uuid,
2151 labels,
2152 ):
2153 """
2154 Create a namespace in a specific cluster
2155
2156 :param namespace: Namespace to be created
2157 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2158 :param labels: Dictionary with labels for the new namespace
2159 :returns: None
2160 """
2161 paths, env = self._init_paths_env(
2162 cluster_name=cluster_uuid, create_if_not_exist=True
2163 )
2164 kubectl = Kubectl(config_file=paths["kube_config"])
2165 await kubectl.create_namespace(
2166 name=namespace,
2167 labels=labels,
2168 )
2169
2170 async def delete_namespace(
2171 self,
2172 namespace,
2173 cluster_uuid,
2174 ):
2175 """
2176 Delete a namespace in a specific cluster
2177
2178 :param namespace: namespace to be deleted
2179 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2180 :returns: None
2181 """
2182 paths, env = self._init_paths_env(
2183 cluster_name=cluster_uuid, create_if_not_exist=True
2184 )
2185 kubectl = Kubectl(config_file=paths["kube_config"])
2186 await kubectl.delete_namespace(
2187 name=namespace,
2188 )
2189
2190 async def copy_secret_data(
2191 self,
2192 src_secret: str,
2193 dst_secret: str,
2194 cluster_uuid: str,
2195 data_key: str,
2196 src_namespace: str = "osm",
2197 dst_namespace: str = "osm",
2198 ):
2199 """
2200 Copy a single key and value from an existing secret to a new one
2201
2202 :param src_secret: name of the existing secret
2203 :param dst_secret: name of the new secret
2204 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2205 :param data_key: key of the existing secret to be copied
2206 :param src_namespace: Namespace of the existing secret
2207 :param dst_namespace: Namespace of the new secret
2208 :returns: None
2209 """
2210 paths, env = self._init_paths_env(
2211 cluster_name=cluster_uuid, create_if_not_exist=True
2212 )
2213 kubectl = Kubectl(config_file=paths["kube_config"])
2214 secret_data = await kubectl.get_secret_content(
2215 name=src_secret,
2216 namespace=src_namespace,
2217 )
2218 # Only the corresponding data_key value needs to be copy
2219 data = {data_key: secret_data.get(data_key)}
2220 await kubectl.create_secret(
2221 name=dst_secret,
2222 data=data,
2223 namespace=dst_namespace,
2224 secret_type="Opaque",
2225 )
2226
2227 async def setup_default_rbac(
2228 self,
2229 name,
2230 namespace,
2231 cluster_uuid,
2232 api_groups,
2233 resources,
2234 verbs,
2235 service_account,
2236 ):
2237 """
2238 Create a basic RBAC for a new namespace.
2239
2240 :param name: name of both Role and Role Binding
2241 :param namespace: K8s namespace
2242 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2243 :param api_groups: Api groups to be allowed in Policy Rule
2244 :param resources: Resources to be allowed in Policy Rule
2245 :param verbs: Verbs to be allowed in Policy Rule
2246 :param service_account: Service Account name used to bind the Role
2247 :returns: None
2248 """
2249 paths, env = self._init_paths_env(
2250 cluster_name=cluster_uuid, create_if_not_exist=True
2251 )
2252 kubectl = Kubectl(config_file=paths["kube_config"])
2253 await kubectl.create_role(
2254 name=name,
2255 labels={},
2256 namespace=namespace,
2257 api_groups=api_groups,
2258 resources=resources,
2259 verbs=verbs,
2260 )
2261 await kubectl.create_role_binding(
2262 name=name,
2263 labels={},
2264 namespace=namespace,
2265 role_name=name,
2266 sa_name=service_account,
2267 )