fb5aa0958e2c0f3cadd690abf4273a37fe71e071
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 from shlex import quote
26 import random
27 import time
28 import shlex
29 import shutil
30 import stat
31 import os
32 import yaml
33 from uuid import uuid4
34 from urllib.parse import urlparse
35
36 from n2vc.config import EnvironConfig
37 from n2vc.exceptions import K8sException
38 from n2vc.k8s_conn import K8sConnector
39 from n2vc.kubectl import Kubectl
40
41
42 class K8sHelmBaseConnector(K8sConnector):
43
44 """
45 ####################################################################################
46 ################################### P U B L I C ####################################
47 ####################################################################################
48 """
49
50 service_account = "osm"
51
52 def __init__(
53 self,
54 fs: object,
55 db: object,
56 kubectl_command: str = "/usr/bin/kubectl",
57 helm_command: str = "/usr/bin/helm",
58 log: object = None,
59 on_update_db=None,
60 ):
61 """
62
63 :param fs: file system for kubernetes and helm configuration
64 :param db: database object to write current operation status
65 :param kubectl_command: path to kubectl executable
66 :param helm_command: path to helm executable
67 :param log: logger
68 :param on_update_db: callback called when k8s connector updates database
69 """
70
71 # parent class
72 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
73
74 self.log.info("Initializing K8S Helm connector")
75
76 self.config = EnvironConfig()
77 # random numbers for release name generation
78 random.seed(time.time())
79
80 # the file system
81 self.fs = fs
82
83 # exception if kubectl is not installed
84 self.kubectl_command = kubectl_command
85 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
86
87 # exception if helm is not installed
88 self._helm_command = helm_command
89 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
90
91 # exception if main post renderer executable is not present
92 self.main_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
93 "mainpostrendererpath"
94 )
95 if self.main_post_renderer_path:
96 self._check_file_exists(
97 filename=self.main_post_renderer_path, exception_if_not_exists=True
98 )
99
100 # exception if podLabels post renderer executable is not present
101 self.podLabels_post_renderer_path = EnvironConfig(prefixes=["OSMLCM_"]).get(
102 "podlabelspostrendererpath"
103 )
104 if self.podLabels_post_renderer_path:
105 self._check_file_exists(
106 filename=self.podLabels_post_renderer_path, exception_if_not_exists=True
107 )
108
109 # obtain stable repo url from config or apply default
110 self._stable_repo_url = self.config.get("stablerepourl")
111 if self._stable_repo_url == "None":
112 self._stable_repo_url = None
113
114 # Lock to avoid concurrent execution of helm commands
115 self.cmd_lock = asyncio.Lock()
116
117 def _get_namespace(self, cluster_uuid: str) -> str:
118 """
119 Obtains the namespace used by the cluster with the uuid passed by argument
120
121 param: cluster_uuid: cluster's uuid
122 """
123
124 # first, obtain the cluster corresponding to the uuid passed by argument
125 k8scluster = self.db.get_one(
126 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
127 )
128 return k8scluster.get("namespace")
129
130 async def init_env(
131 self,
132 k8s_creds: str,
133 namespace: str = "kube-system",
134 reuse_cluster_uuid=None,
135 **kwargs,
136 ) -> tuple[str, bool]:
137 """
138 It prepares a given K8s cluster environment to run Charts
139
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
141 '.kube/config'
142 :param namespace: optional namespace to be used for helm. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :param kwargs: Additional parameters (None yet)
146 :return: uuid of the K8s cluster and True if connector has installed some
147 software in the cluster
148 (on error, an exception will be raised)
149 """
150
151 if reuse_cluster_uuid:
152 cluster_id = reuse_cluster_uuid
153 else:
154 cluster_id = str(uuid4())
155
156 self.log.debug(
157 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
158 )
159
160 paths, env = self._init_paths_env(
161 cluster_name=cluster_id, create_if_not_exist=True
162 )
163 mode = stat.S_IRUSR | stat.S_IWUSR
164 with open(paths["kube_config"], "w", mode) as f:
165 f.write(k8s_creds)
166 os.chmod(paths["kube_config"], 0o600)
167
168 # Code with initialization specific of helm version
169 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
170
171 # sync fs with local data
172 self.fs.reverse_sync(from_path=cluster_id)
173
174 self.log.info("Cluster {} initialized".format(cluster_id))
175
176 return cluster_id, n2vc_installed_sw
177
178 async def repo_add(
179 self,
180 cluster_uuid: str,
181 name: str,
182 url: str,
183 repo_type: str = "chart",
184 cert: str = None,
185 user: str = None,
186 password: str = None,
187 oci: bool = False,
188 ):
189 self.log.debug(
190 "Cluster {}, adding {} repository {}. URL: {}".format(
191 cluster_uuid, repo_type, name, url
192 )
193 )
194
195 # init_env
196 paths, env = self._init_paths_env(
197 cluster_name=cluster_uuid, create_if_not_exist=True
198 )
199
200 # sync local dir
201 self.fs.sync(from_path=cluster_uuid)
202
203 if oci:
204 if user and password:
205 host_port = urlparse(url).netloc if url.startswith("oci://") else url
206 # helm registry login url
207 command = "env KUBECONFIG={} {} registry login {}".format(
208 paths["kube_config"], self._helm_command, quote(host_port)
209 )
210 else:
211 self.log.debug(
212 "OCI registry login is not needed for repo: {}".format(name)
213 )
214 return
215 else:
216 # helm repo add name url
217 command = "env KUBECONFIG={} {} repo add {} {}".format(
218 paths["kube_config"], self._helm_command, quote(name), quote(url)
219 )
220
221 if cert:
222 temp_cert_file = os.path.join(
223 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
224 )
225 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
226 with open(temp_cert_file, "w") as the_cert:
227 the_cert.write(cert)
228 command += " --ca-file {}".format(quote(temp_cert_file))
229
230 if user:
231 command += " --username={}".format(quote(user))
232
233 if password:
234 command += " --password={}".format(quote(password))
235
236 self.log.debug("adding repo: {}".format(command))
237 await self._local_async_exec(
238 command=command, raise_exception_on_error=True, env=env
239 )
240
241 if not oci:
242 # helm repo update
243 command = "env KUBECONFIG={} {} repo update {}".format(
244 paths["kube_config"], self._helm_command, quote(name)
245 )
246 self.log.debug("updating repo: {}".format(command))
247 await self._local_async_exec(
248 command=command, raise_exception_on_error=False, env=env
249 )
250
251 # sync fs
252 self.fs.reverse_sync(from_path=cluster_uuid)
253
254 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
255 self.log.debug(
256 "Cluster {}, updating {} repository {}".format(
257 cluster_uuid, repo_type, name
258 )
259 )
260
261 # init_env
262 paths, env = self._init_paths_env(
263 cluster_name=cluster_uuid, create_if_not_exist=True
264 )
265
266 # sync local dir
267 self.fs.sync(from_path=cluster_uuid)
268
269 # helm repo update
270 command = "{} repo update {}".format(self._helm_command, quote(name))
271 self.log.debug("updating repo: {}".format(command))
272 await self._local_async_exec(
273 command=command, raise_exception_on_error=False, env=env
274 )
275
276 # sync fs
277 self.fs.reverse_sync(from_path=cluster_uuid)
278
279 async def repo_list(self, cluster_uuid: str) -> list:
280 """
281 Get the list of registered repositories
282
283 :return: list of registered repositories: [ (name, url) .... ]
284 """
285
286 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
287
288 # config filename
289 paths, env = self._init_paths_env(
290 cluster_name=cluster_uuid, create_if_not_exist=True
291 )
292
293 # sync local dir
294 self.fs.sync(from_path=cluster_uuid)
295
296 command = "env KUBECONFIG={} {} repo list --output yaml".format(
297 paths["kube_config"], self._helm_command
298 )
299
300 # Set exception to false because if there are no repos just want an empty list
301 output, _rc = await self._local_async_exec(
302 command=command, raise_exception_on_error=False, env=env
303 )
304
305 # sync fs
306 self.fs.reverse_sync(from_path=cluster_uuid)
307
308 if _rc == 0:
309 if output and len(output) > 0:
310 repos = yaml.load(output, Loader=yaml.SafeLoader)
311 # unify format between helm2 and helm3 setting all keys lowercase
312 return self._lower_keys_list(repos)
313 else:
314 return []
315 else:
316 return []
317
318 async def repo_remove(self, cluster_uuid: str, name: str):
319 self.log.debug(
320 "remove {} repositories for cluster {}".format(name, cluster_uuid)
321 )
322
323 # init env, paths
324 paths, env = self._init_paths_env(
325 cluster_name=cluster_uuid, create_if_not_exist=True
326 )
327
328 # sync local dir
329 self.fs.sync(from_path=cluster_uuid)
330
331 command = "env KUBECONFIG={} {} repo remove {}".format(
332 paths["kube_config"], self._helm_command, quote(name)
333 )
334 await self._local_async_exec(
335 command=command, raise_exception_on_error=True, env=env
336 )
337
338 # sync fs
339 self.fs.reverse_sync(from_path=cluster_uuid)
340
341 async def reset(
342 self,
343 cluster_uuid: str,
344 force: bool = False,
345 uninstall_sw: bool = False,
346 **kwargs,
347 ) -> bool:
348 """Reset a cluster
349
350 Resets the Kubernetes cluster by removing the helm deployment that represents it.
351
352 :param cluster_uuid: The UUID of the cluster to reset
353 :param force: Boolean to force the reset
354 :param uninstall_sw: Boolean to force the reset
355 :param kwargs: Additional parameters (None yet)
356 :return: Returns True if successful or raises an exception.
357 """
358 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
359 self.log.debug(
360 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
361 cluster_uuid, uninstall_sw
362 )
363 )
364
365 # sync local dir
366 self.fs.sync(from_path=cluster_uuid)
367
368 # uninstall releases if needed.
369 if uninstall_sw:
370 releases = await self.instances_list(cluster_uuid=cluster_uuid)
371 if len(releases) > 0:
372 if force:
373 for r in releases:
374 try:
375 kdu_instance = r.get("name")
376 chart = r.get("chart")
377 self.log.debug(
378 "Uninstalling {} -> {}".format(chart, kdu_instance)
379 )
380 await self.uninstall(
381 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
382 )
383 except Exception as e:
384 # will not raise exception as it was found
385 # that in some cases of previously installed helm releases it
386 # raised an error
387 self.log.warn(
388 "Error uninstalling release {}: {}".format(
389 kdu_instance, e
390 )
391 )
392 else:
393 msg = (
394 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
395 ).format(cluster_uuid)
396 self.log.warn(msg)
397 uninstall_sw = (
398 False # Allow to remove k8s cluster without removing Tiller
399 )
400
401 if uninstall_sw:
402 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
403
404 # delete cluster directory
405 self.log.debug("Removing directory {}".format(cluster_uuid))
406 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
407 # Remove also local directorio if still exist
408 direct = self.fs.path + "/" + cluster_uuid
409 shutil.rmtree(direct, ignore_errors=True)
410
411 return True
412
413 def _is_helm_chart_a_file(self, chart_name: str):
414 return chart_name.count("/") > 1
415
416 @staticmethod
417 def _is_helm_chart_a_url(chart_name: str):
418 result = urlparse(chart_name)
419 return all([result.scheme, result.netloc])
420
421 async def _install_impl(
422 self,
423 cluster_id: str,
424 kdu_model: str,
425 paths: dict,
426 env: dict,
427 kdu_instance: str,
428 atomic: bool = True,
429 timeout: float = 300,
430 params: dict = None,
431 db_dict: dict = None,
432 labels: dict = None,
433 kdu_name: str = None,
434 namespace: str = None,
435 ):
436 # init env, paths
437 paths, env = self._init_paths_env(
438 cluster_name=cluster_id, create_if_not_exist=True
439 )
440
441 # params to str
442 params_str, file_to_delete = self._params_to_file_option(
443 cluster_id=cluster_id, params=params
444 )
445
446 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
447
448 command = self._get_install_command(
449 kdu_model,
450 kdu_instance,
451 namespace,
452 labels,
453 params_str,
454 version,
455 atomic,
456 timeout,
457 paths["kube_config"],
458 )
459
460 self.log.debug("installing: {}".format(command))
461
462 if atomic:
463 # exec helm in a task
464 exec_task = asyncio.ensure_future(
465 coro_or_future=self._local_async_exec(
466 command=command, raise_exception_on_error=False, env=env
467 )
468 )
469
470 # write status in another task
471 status_task = asyncio.ensure_future(
472 coro_or_future=self._store_status(
473 cluster_id=cluster_id,
474 kdu_instance=kdu_instance,
475 namespace=namespace,
476 db_dict=db_dict,
477 operation="install",
478 )
479 )
480
481 # wait for execution task
482 await asyncio.wait([exec_task])
483
484 # cancel status task
485 status_task.cancel()
486
487 output, rc = exec_task.result()
488
489 else:
490 output, rc = await self._local_async_exec(
491 command=command, raise_exception_on_error=False, env=env
492 )
493
494 # remove temporal values yaml file
495 if file_to_delete:
496 os.remove(file_to_delete)
497
498 # write final status
499 await self._store_status(
500 cluster_id=cluster_id,
501 kdu_instance=kdu_instance,
502 namespace=namespace,
503 db_dict=db_dict,
504 operation="install",
505 )
506
507 if rc != 0:
508 msg = "Error executing command: {}\nOutput: {}".format(command, output)
509 self.log.error(msg)
510 raise K8sException(msg)
511
512 async def upgrade(
513 self,
514 cluster_uuid: str,
515 kdu_instance: str,
516 kdu_model: str = None,
517 atomic: bool = True,
518 timeout: float = 300,
519 params: dict = None,
520 db_dict: dict = None,
521 namespace: str = None,
522 reset_values: bool = False,
523 reuse_values: bool = True,
524 reset_then_reuse_values: bool = False,
525 force: bool = False,
526 ):
527 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
528
529 # sync local dir
530 self.fs.sync(from_path=cluster_uuid)
531
532 # look for instance to obtain namespace
533
534 # set namespace
535 if not namespace:
536 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
537 if not instance_info:
538 raise K8sException("kdu_instance {} not found".format(kdu_instance))
539 namespace = instance_info["namespace"]
540
541 # init env, paths
542 paths, env = self._init_paths_env(
543 cluster_name=cluster_uuid, create_if_not_exist=True
544 )
545
546 # sync local dir
547 self.fs.sync(from_path=cluster_uuid)
548
549 # params to str
550 params_str, file_to_delete = self._params_to_file_option(
551 cluster_id=cluster_uuid, params=params
552 )
553
554 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
555
556 labels_dict = None
557 if db_dict and await self._contains_labels(
558 kdu_instance, namespace, paths["kube_config"], env
559 ):
560 labels_dict = await self._labels_dict(db_dict, kdu_instance)
561
562 command = self._get_upgrade_command(
563 kdu_model,
564 kdu_instance,
565 namespace,
566 params_str,
567 labels_dict,
568 version,
569 atomic,
570 timeout,
571 paths["kube_config"],
572 reset_values,
573 reuse_values,
574 reset_then_reuse_values,
575 force,
576 )
577
578 self.log.debug("upgrading: {}".format(command))
579
580 if atomic:
581 # exec helm in a task
582 exec_task = asyncio.ensure_future(
583 coro_or_future=self._local_async_exec(
584 command=command, raise_exception_on_error=False, env=env
585 )
586 )
587 # write status in another task
588 status_task = asyncio.ensure_future(
589 coro_or_future=self._store_status(
590 cluster_id=cluster_uuid,
591 kdu_instance=kdu_instance,
592 namespace=namespace,
593 db_dict=db_dict,
594 operation="upgrade",
595 )
596 )
597
598 # wait for execution task
599 await asyncio.wait([exec_task])
600
601 # cancel status task
602 status_task.cancel()
603 output, rc = exec_task.result()
604
605 else:
606 output, rc = await self._local_async_exec(
607 command=command, raise_exception_on_error=False, env=env
608 )
609
610 # remove temporal values yaml file
611 if file_to_delete:
612 os.remove(file_to_delete)
613
614 # write final status
615 await self._store_status(
616 cluster_id=cluster_uuid,
617 kdu_instance=kdu_instance,
618 namespace=namespace,
619 db_dict=db_dict,
620 operation="upgrade",
621 )
622
623 if rc != 0:
624 msg = "Error executing command: {}\nOutput: {}".format(command, output)
625 self.log.error(msg)
626 raise K8sException(msg)
627
628 # sync fs
629 self.fs.reverse_sync(from_path=cluster_uuid)
630
631 # return new revision number
632 instance = await self.get_instance_info(
633 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
634 )
635 if instance:
636 revision = int(instance.get("revision"))
637 self.log.debug("New revision: {}".format(revision))
638 return revision
639 else:
640 return 0
641
642 async def scale(
643 self,
644 kdu_instance: str,
645 scale: int,
646 resource_name: str,
647 total_timeout: float = 1800,
648 cluster_uuid: str = None,
649 kdu_model: str = None,
650 atomic: bool = True,
651 db_dict: dict = None,
652 **kwargs,
653 ):
654 """Scale a resource in a Helm Chart.
655
656 Args:
657 kdu_instance: KDU instance name
658 scale: Scale to which to set the resource
659 resource_name: Resource name
660 total_timeout: The time, in seconds, to wait
661 cluster_uuid: The UUID of the cluster
662 kdu_model: The chart reference
663 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
664 The --wait flag will be set automatically if --atomic is used
665 db_dict: Dictionary for any additional data
666 kwargs: Additional parameters
667
668 Returns:
669 True if successful, False otherwise
670 """
671
672 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
673 if resource_name:
674 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
675 resource_name, kdu_model, cluster_uuid
676 )
677
678 self.log.debug(debug_mgs)
679
680 # look for instance to obtain namespace
681 # get_instance_info function calls the sync command
682 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
683 if not instance_info:
684 raise K8sException("kdu_instance {} not found".format(kdu_instance))
685
686 # init env, paths
687 paths, env = self._init_paths_env(
688 cluster_name=cluster_uuid, create_if_not_exist=True
689 )
690
691 # version
692 kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
693
694 repo_url = await self._find_repo(kdu_model, cluster_uuid)
695
696 _, replica_str = await self._get_replica_count_url(
697 kdu_model, repo_url, resource_name
698 )
699
700 labels_dict = None
701 if db_dict and await self._contains_labels(
702 kdu_instance, instance_info["namespace"], paths["kube_config"], env
703 ):
704 labels_dict = await self._labels_dict(db_dict, kdu_instance)
705
706 command = self._get_upgrade_scale_command(
707 kdu_model,
708 kdu_instance,
709 instance_info["namespace"],
710 scale,
711 labels_dict,
712 version,
713 atomic,
714 replica_str,
715 total_timeout,
716 resource_name,
717 paths["kube_config"],
718 )
719
720 self.log.debug("scaling: {}".format(command))
721
722 if atomic:
723 # exec helm in a task
724 exec_task = asyncio.ensure_future(
725 coro_or_future=self._local_async_exec(
726 command=command, raise_exception_on_error=False, env=env
727 )
728 )
729 # write status in another task
730 status_task = asyncio.ensure_future(
731 coro_or_future=self._store_status(
732 cluster_id=cluster_uuid,
733 kdu_instance=kdu_instance,
734 namespace=instance_info["namespace"],
735 db_dict=db_dict,
736 operation="scale",
737 )
738 )
739
740 # wait for execution task
741 await asyncio.wait([exec_task])
742
743 # cancel status task
744 status_task.cancel()
745 output, rc = exec_task.result()
746
747 else:
748 output, rc = await self._local_async_exec(
749 command=command, raise_exception_on_error=False, env=env
750 )
751
752 # write final status
753 await self._store_status(
754 cluster_id=cluster_uuid,
755 kdu_instance=kdu_instance,
756 namespace=instance_info["namespace"],
757 db_dict=db_dict,
758 operation="scale",
759 )
760
761 if rc != 0:
762 msg = "Error executing command: {}\nOutput: {}".format(command, output)
763 self.log.error(msg)
764 raise K8sException(msg)
765
766 # sync fs
767 self.fs.reverse_sync(from_path=cluster_uuid)
768
769 return True
770
771 async def get_scale_count(
772 self,
773 resource_name: str,
774 kdu_instance: str,
775 cluster_uuid: str,
776 kdu_model: str,
777 **kwargs,
778 ) -> int:
779 """Get a resource scale count.
780
781 Args:
782 cluster_uuid: The UUID of the cluster
783 resource_name: Resource name
784 kdu_instance: KDU instance name
785 kdu_model: The name or path of an Helm Chart
786 kwargs: Additional parameters
787
788 Returns:
789 Resource instance count
790 """
791
792 self.log.debug(
793 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
794 )
795
796 # look for instance to obtain namespace
797 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
798 if not instance_info:
799 raise K8sException("kdu_instance {} not found".format(kdu_instance))
800
801 # init env, paths
802 paths, _ = self._init_paths_env(
803 cluster_name=cluster_uuid, create_if_not_exist=True
804 )
805
806 replicas = await self._get_replica_count_instance(
807 kdu_instance=kdu_instance,
808 namespace=instance_info["namespace"],
809 kubeconfig=paths["kube_config"],
810 resource_name=resource_name,
811 )
812
813 self.log.debug(
814 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
815 )
816
817 # Get default value if scale count is not found from provided values
818 # Important note: this piece of code shall only be executed in the first scaling operation,
819 # since it is expected that the _get_replica_count_instance is able to obtain the number of
820 # replicas when a scale operation was already conducted previously for this KDU/resource!
821 if replicas is None:
822 repo_url = await self._find_repo(
823 kdu_model=kdu_model, cluster_uuid=cluster_uuid
824 )
825 replicas, _ = await self._get_replica_count_url(
826 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
827 )
828
829 self.log.debug(
830 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
831 f"{resource_name} obtained: {replicas}"
832 )
833
834 if replicas is None:
835 msg = "Replica count not found. Cannot be scaled"
836 self.log.error(msg)
837 raise K8sException(msg)
838
839 return int(replicas)
840
841 async def rollback(
842 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
843 ):
844 self.log.debug(
845 "rollback kdu_instance {} to revision {} from cluster {}".format(
846 kdu_instance, revision, cluster_uuid
847 )
848 )
849
850 # sync local dir
851 self.fs.sync(from_path=cluster_uuid)
852
853 # look for instance to obtain namespace
854 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
855 if not instance_info:
856 raise K8sException("kdu_instance {} not found".format(kdu_instance))
857
858 # init env, paths
859 paths, env = self._init_paths_env(
860 cluster_name=cluster_uuid, create_if_not_exist=True
861 )
862
863 # sync local dir
864 self.fs.sync(from_path=cluster_uuid)
865
866 command = self._get_rollback_command(
867 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
868 )
869
870 self.log.debug("rolling_back: {}".format(command))
871
872 # exec helm in a task
873 exec_task = asyncio.ensure_future(
874 coro_or_future=self._local_async_exec(
875 command=command, raise_exception_on_error=False, env=env
876 )
877 )
878 # write status in another task
879 status_task = asyncio.ensure_future(
880 coro_or_future=self._store_status(
881 cluster_id=cluster_uuid,
882 kdu_instance=kdu_instance,
883 namespace=instance_info["namespace"],
884 db_dict=db_dict,
885 operation="rollback",
886 )
887 )
888
889 # wait for execution task
890 await asyncio.wait([exec_task])
891
892 # cancel status task
893 status_task.cancel()
894
895 output, rc = exec_task.result()
896
897 # write final status
898 await self._store_status(
899 cluster_id=cluster_uuid,
900 kdu_instance=kdu_instance,
901 namespace=instance_info["namespace"],
902 db_dict=db_dict,
903 operation="rollback",
904 )
905
906 if rc != 0:
907 msg = "Error executing command: {}\nOutput: {}".format(command, output)
908 self.log.error(msg)
909 raise K8sException(msg)
910
911 # sync fs
912 self.fs.reverse_sync(from_path=cluster_uuid)
913
914 # return new revision number
915 instance = await self.get_instance_info(
916 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
917 )
918 if instance:
919 revision = int(instance.get("revision"))
920 self.log.debug("New revision: {}".format(revision))
921 return revision
922 else:
923 return 0
924
925 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
926 """
927 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
928 (this call should happen after all _terminate-config-primitive_ of the VNF
929 are invoked).
930
931 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
932 :param kdu_instance: unique name for the KDU instance to be deleted
933 :param kwargs: Additional parameters (None yet)
934 :return: True if successful
935 """
936
937 self.log.debug(
938 "uninstall kdu_instance {} from cluster {}".format(
939 kdu_instance, cluster_uuid
940 )
941 )
942
943 # sync local dir
944 self.fs.sync(from_path=cluster_uuid)
945
946 # look for instance to obtain namespace
947 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
948 if not instance_info:
949 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
950 return True
951 # init env, paths
952 paths, env = self._init_paths_env(
953 cluster_name=cluster_uuid, create_if_not_exist=True
954 )
955
956 # sync local dir
957 self.fs.sync(from_path=cluster_uuid)
958
959 command = self._get_uninstall_command(
960 kdu_instance, instance_info["namespace"], paths["kube_config"]
961 )
962 output, _rc = await self._local_async_exec(
963 command=command, raise_exception_on_error=True, env=env
964 )
965
966 # sync fs
967 self.fs.reverse_sync(from_path=cluster_uuid)
968
969 return self._output_to_table(output)
970
971 async def instances_list(self, cluster_uuid: str) -> list:
972 """
973 returns a list of deployed releases in a cluster
974
975 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
976 :return:
977 """
978
979 self.log.debug("list releases for cluster {}".format(cluster_uuid))
980
981 # sync local dir
982 self.fs.sync(from_path=cluster_uuid)
983
984 # execute internal command
985 result = await self._instances_list(cluster_uuid)
986
987 # sync fs
988 self.fs.reverse_sync(from_path=cluster_uuid)
989
990 return result
991
992 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
993 instances = await self.instances_list(cluster_uuid=cluster_uuid)
994 for instance in instances:
995 if instance.get("name") == kdu_instance:
996 return instance
997 self.log.debug("Instance {} not found".format(kdu_instance))
998 return None
999
1000 async def upgrade_charm(
1001 self,
1002 ee_id: str = None,
1003 path: str = None,
1004 charm_id: str = None,
1005 charm_type: str = None,
1006 timeout: float = None,
1007 ) -> str:
1008 """This method upgrade charms in VNFs
1009
1010 Args:
1011 ee_id: Execution environment id
1012 path: Local path to the charm
1013 charm_id: charm-id
1014 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
1015 timeout: (Float) Timeout for the ns update operation
1016
1017 Returns:
1018 The output of the update operation if status equals to "completed"
1019 """
1020 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
1021
1022 async def exec_primitive(
1023 self,
1024 cluster_uuid: str = None,
1025 kdu_instance: str = None,
1026 primitive_name: str = None,
1027 timeout: float = 300,
1028 params: dict = None,
1029 db_dict: dict = None,
1030 **kwargs,
1031 ) -> str:
1032 """Exec primitive (Juju action)
1033
1034 :param cluster_uuid: The UUID of the cluster or namespace:cluster
1035 :param kdu_instance: The unique name of the KDU instance
1036 :param primitive_name: Name of action that will be executed
1037 :param timeout: Timeout for action execution
1038 :param params: Dictionary of all the parameters needed for the action
1039 :db_dict: Dictionary for any additional data
1040 :param kwargs: Additional parameters (None yet)
1041
1042 :return: Returns the output of the action
1043 """
1044 raise K8sException(
1045 "KDUs deployed with Helm don't support actions "
1046 "different from rollback, upgrade and status"
1047 )
1048
1049 async def get_services(
1050 self, cluster_uuid: str, kdu_instance: str, namespace: str
1051 ) -> list:
1052 """
1053 Returns a list of services defined for the specified kdu instance.
1054
1055 :param cluster_uuid: UUID of a K8s cluster known by OSM
1056 :param kdu_instance: unique name for the KDU instance
1057 :param namespace: K8s namespace used by the KDU instance
1058 :return: If successful, it will return a list of services, Each service
1059 can have the following data:
1060 - `name` of the service
1061 - `type` type of service in the k8 cluster
1062 - `ports` List of ports offered by the service, for each port includes at least
1063 name, port, protocol
1064 - `cluster_ip` Internal ip to be used inside k8s cluster
1065 - `external_ip` List of external ips (in case they are available)
1066 """
1067
1068 self.log.debug(
1069 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1070 cluster_uuid, kdu_instance
1071 )
1072 )
1073
1074 # init env, paths
1075 paths, env = self._init_paths_env(
1076 cluster_name=cluster_uuid, create_if_not_exist=True
1077 )
1078
1079 # sync local dir
1080 self.fs.sync(from_path=cluster_uuid)
1081
1082 # get list of services names for kdu
1083 service_names = await self._get_services(
1084 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1085 )
1086
1087 service_list = []
1088 for service in service_names:
1089 service = await self._get_service(cluster_uuid, service, namespace)
1090 service_list.append(service)
1091
1092 # sync fs
1093 self.fs.reverse_sync(from_path=cluster_uuid)
1094
1095 return service_list
1096
1097 async def get_service(
1098 self, cluster_uuid: str, service_name: str, namespace: str
1099 ) -> object:
1100 self.log.debug(
1101 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1102 service_name, namespace, cluster_uuid
1103 )
1104 )
1105
1106 # sync local dir
1107 self.fs.sync(from_path=cluster_uuid)
1108
1109 service = await self._get_service(cluster_uuid, service_name, namespace)
1110
1111 # sync fs
1112 self.fs.reverse_sync(from_path=cluster_uuid)
1113
1114 return service
1115
1116 async def status_kdu(
1117 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1118 ) -> Union[str, dict]:
1119 """
1120 This call would retrieve tha current state of a given KDU instance. It would be
1121 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1122 values_ of the configuration parameters applied to a given instance. This call
1123 would be based on the `status` call.
1124
1125 :param cluster_uuid: UUID of a K8s cluster known by OSM
1126 :param kdu_instance: unique name for the KDU instance
1127 :param kwargs: Additional parameters (None yet)
1128 :param yaml_format: if the return shall be returned as an YAML string or as a
1129 dictionary
1130 :return: If successful, it will return the following vector of arguments:
1131 - K8s `namespace` in the cluster where the KDU lives
1132 - `state` of the KDU instance. It can be:
1133 - UNKNOWN
1134 - DEPLOYED
1135 - DELETED
1136 - SUPERSEDED
1137 - FAILED or
1138 - DELETING
1139 - List of `resources` (objects) that this release consists of, sorted by kind,
1140 and the status of those resources
1141 - Last `deployment_time`.
1142
1143 """
1144 self.log.debug(
1145 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1146 cluster_uuid, kdu_instance
1147 )
1148 )
1149
1150 # sync local dir
1151 self.fs.sync(from_path=cluster_uuid)
1152
1153 # get instance: needed to obtain namespace
1154 instances = await self._instances_list(cluster_id=cluster_uuid)
1155 for instance in instances:
1156 if instance.get("name") == kdu_instance:
1157 break
1158 else:
1159 # instance does not exist
1160 raise K8sException(
1161 "Instance name: {} not found in cluster: {}".format(
1162 kdu_instance, cluster_uuid
1163 )
1164 )
1165
1166 status = await self._status_kdu(
1167 cluster_id=cluster_uuid,
1168 kdu_instance=kdu_instance,
1169 namespace=instance["namespace"],
1170 yaml_format=yaml_format,
1171 show_error_log=True,
1172 )
1173
1174 # sync fs
1175 self.fs.reverse_sync(from_path=cluster_uuid)
1176
1177 return status
1178
1179 async def get_values_kdu(
1180 self, kdu_instance: str, namespace: str, kubeconfig: str
1181 ) -> str:
1182 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1183
1184 return await self._exec_get_command(
1185 get_command="values",
1186 kdu_instance=kdu_instance,
1187 namespace=namespace,
1188 kubeconfig=kubeconfig,
1189 )
1190
1191 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1192 """Method to obtain the Helm Chart package's values
1193
1194 Args:
1195 kdu_model: The name or path of an Helm Chart
1196 repo_url: Helm Chart repository url
1197
1198 Returns:
1199 str: the values of the Helm Chart package
1200 """
1201
1202 self.log.debug(
1203 "inspect kdu_model values {} from (optional) repo: {}".format(
1204 kdu_model, repo_url
1205 )
1206 )
1207
1208 return await self._exec_inspect_command(
1209 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1210 )
1211
1212 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1213 self.log.debug(
1214 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1215 )
1216
1217 return await self._exec_inspect_command(
1218 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1219 )
1220
1221 async def synchronize_repos(self, cluster_uuid: str):
1222 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1223 try:
1224 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1225 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1226
1227 local_repo_list = await self.repo_list(cluster_uuid)
1228 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1229
1230 deleted_repo_list = []
1231 added_repo_dict = {}
1232
1233 # iterate over the list of repos in the database that should be
1234 # added if not present
1235 for repo_name, db_repo in db_repo_dict.items():
1236 try:
1237 # check if it is already present
1238 curr_repo_url = local_repo_dict.get(db_repo["name"])
1239 repo_id = db_repo.get("_id")
1240 if curr_repo_url != db_repo["url"]:
1241 if curr_repo_url:
1242 self.log.debug(
1243 "repo {} url changed, delete and and again".format(
1244 db_repo["url"]
1245 )
1246 )
1247 await self.repo_remove(cluster_uuid, db_repo["name"])
1248 deleted_repo_list.append(repo_id)
1249
1250 # add repo
1251 self.log.debug("add repo {}".format(db_repo["name"]))
1252 await self.repo_add(
1253 cluster_uuid,
1254 db_repo["name"],
1255 db_repo["url"],
1256 cert=db_repo.get("ca_cert"),
1257 user=db_repo.get("user"),
1258 password=db_repo.get("password"),
1259 oci=db_repo.get("oci", False),
1260 )
1261 added_repo_dict[repo_id] = db_repo["name"]
1262 except Exception as e:
1263 raise K8sException(
1264 "Error adding repo id: {}, err_msg: {} ".format(
1265 repo_id, repr(e)
1266 )
1267 )
1268
1269 # Delete repos that are present but not in nbi_list
1270 for repo_name in local_repo_dict:
1271 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1272 self.log.debug("delete repo {}".format(repo_name))
1273 try:
1274 await self.repo_remove(cluster_uuid, repo_name)
1275 deleted_repo_list.append(repo_name)
1276 except Exception as e:
1277 self.warning(
1278 "Error deleting repo, name: {}, err_msg: {}".format(
1279 repo_name, str(e)
1280 )
1281 )
1282
1283 return deleted_repo_list, added_repo_dict
1284
1285 except K8sException:
1286 raise
1287 except Exception as e:
1288 # Do not raise errors synchronizing repos
1289 self.log.error("Error synchronizing repos: {}".format(e))
1290 raise Exception("Error synchronizing repos: {}".format(e))
1291
1292 def _get_db_repos_dict(self, repo_ids: list):
1293 db_repos_dict = {}
1294 for repo_id in repo_ids:
1295 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1296 db_repos_dict[db_repo["name"]] = db_repo
1297 return db_repos_dict
1298
1299 """
1300 ####################################################################################
1301 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1302 ####################################################################################
1303 """
1304
1305 @abc.abstractmethod
1306 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1307 """
1308 Creates and returns base cluster and kube dirs and returns them.
1309 Also created helm3 dirs according to new directory specification, paths are
1310 not returned but assigned to helm environment variables
1311
1312 :param cluster_name: cluster_name
1313 :return: Dictionary with config_paths and dictionary with helm environment variables
1314 """
1315
1316 @abc.abstractmethod
1317 async def _cluster_init(self, cluster_id, namespace, paths, env):
1318 """
1319 Implements the helm version dependent cluster initialization
1320 """
1321
1322 @abc.abstractmethod
1323 async def _instances_list(self, cluster_id):
1324 """
1325 Implements the helm version dependent helm instances list
1326 """
1327
1328 @abc.abstractmethod
1329 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1330 """
1331 Implements the helm version dependent method to obtain services from a helm instance
1332 """
1333
1334 @abc.abstractmethod
1335 async def _status_kdu(
1336 self,
1337 cluster_id: str,
1338 kdu_instance: str,
1339 namespace: str = None,
1340 yaml_format: bool = False,
1341 show_error_log: bool = False,
1342 ) -> Union[str, dict]:
1343 """
1344 Implements the helm version dependent method to obtain status of a helm instance
1345 """
1346
1347 @abc.abstractmethod
1348 def _get_install_command(
1349 self,
1350 kdu_model,
1351 kdu_instance,
1352 namespace,
1353 labels,
1354 params_str,
1355 version,
1356 atomic,
1357 timeout,
1358 kubeconfig,
1359 ) -> str:
1360 """
1361 Obtain command to be executed to delete the indicated instance
1362 """
1363
1364 @abc.abstractmethod
1365 def _get_upgrade_scale_command(
1366 self,
1367 kdu_model,
1368 kdu_instance,
1369 namespace,
1370 count,
1371 labels,
1372 version,
1373 atomic,
1374 replicas,
1375 timeout,
1376 resource_name,
1377 kubeconfig,
1378 ) -> str:
1379 """Generates the command to scale a Helm Chart release
1380
1381 Args:
1382 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1383 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1384 namespace (str): Namespace where this KDU instance is deployed
1385 scale (int): Scale count
1386 version (str): Constraint with specific version of the Chart to use
1387 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1388 The --wait flag will be set automatically if --atomic is used
1389 replica_str (str): The key under resource_name key where the scale count is stored
1390 timeout (float): The time, in seconds, to wait
1391 resource_name (str): The KDU's resource to scale
1392 kubeconfig (str): Kubeconfig file path
1393
1394 Returns:
1395 str: command to scale a Helm Chart release
1396 """
1397
1398 @abc.abstractmethod
1399 def _get_upgrade_command(
1400 self,
1401 kdu_model,
1402 kdu_instance,
1403 namespace,
1404 params_str,
1405 labels,
1406 version,
1407 atomic,
1408 timeout,
1409 kubeconfig,
1410 reset_values,
1411 reuse_values,
1412 reset_then_reuse_values,
1413 force,
1414 ) -> str:
1415 """Generates the command to upgrade a Helm Chart release
1416
1417 Args:
1418 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1419 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1420 namespace (str): Namespace where this KDU instance is deployed
1421 params_str (str): Params used to upgrade the Helm Chart release
1422 version (str): Constraint with specific version of the Chart to use
1423 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1424 The --wait flag will be set automatically if --atomic is used
1425 timeout (float): The time, in seconds, to wait
1426 kubeconfig (str): Kubeconfig file path
1427 reset_values(bool): If set, helm resets values instead of reusing previous values.
1428 reuse_values(bool): If set, helm reuses previous values.
1429 reset_then_reuse_values(bool): If set, helm resets values, then apply the last release's values
1430 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1431 Returns:
1432 str: command to upgrade a Helm Chart release
1433 """
1434
1435 @abc.abstractmethod
1436 def _get_rollback_command(
1437 self, kdu_instance, namespace, revision, kubeconfig
1438 ) -> str:
1439 """
1440 Obtain command to be executed to rollback the indicated instance
1441 """
1442
1443 @abc.abstractmethod
1444 def _get_uninstall_command(
1445 self, kdu_instance: str, namespace: str, kubeconfig: str
1446 ) -> str:
1447 """
1448 Obtain command to be executed to delete the indicated instance
1449 """
1450
1451 @abc.abstractmethod
1452 def _get_inspect_command(
1453 self, show_command: str, kdu_model: str, repo_str: str, version: str
1454 ):
1455 """Generates the command to obtain the information about an Helm Chart package
1456 (´helm show ...´ command)
1457
1458 Args:
1459 show_command: the second part of the command (`helm show <show_command>`)
1460 kdu_model: The name or path of an Helm Chart
1461 repo_url: Helm Chart repository url
1462 version: constraint with specific version of the Chart to use
1463
1464 Returns:
1465 str: the generated Helm Chart command
1466 """
1467
1468 @abc.abstractmethod
1469 def _get_get_command(
1470 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1471 ):
1472 """Obtain command to be executed to get information about the kdu instance."""
1473
1474 @abc.abstractmethod
1475 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1476 """
1477 Method call to uninstall cluster software for helm. This method is dependent
1478 of helm version
1479 For Helm v2 it will be called when Tiller must be uninstalled
1480 For Helm v3 it does nothing and does not need to be callled
1481 """
1482
1483 @abc.abstractmethod
1484 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1485 """
1486 Obtains the cluster repos identifiers
1487 """
1488
1489 """
1490 ####################################################################################
1491 ################################### P R I V A T E ##################################
1492 ####################################################################################
1493 """
1494
1495 @staticmethod
1496 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1497 if os.path.exists(filename):
1498 return True
1499 else:
1500 msg = "File {} does not exist".format(filename)
1501 if exception_if_not_exists:
1502 raise K8sException(msg)
1503
1504 @staticmethod
1505 def _remove_multiple_spaces(strobj):
1506 strobj = strobj.strip()
1507 while " " in strobj:
1508 strobj = strobj.replace(" ", " ")
1509 return strobj
1510
1511 @staticmethod
1512 def _output_to_lines(output: str) -> list:
1513 output_lines = list()
1514 lines = output.splitlines(keepends=False)
1515 for line in lines:
1516 line = line.strip()
1517 if len(line) > 0:
1518 output_lines.append(line)
1519 return output_lines
1520
1521 @staticmethod
1522 def _output_to_table(output: str) -> list:
1523 output_table = list()
1524 lines = output.splitlines(keepends=False)
1525 for line in lines:
1526 line = line.replace("\t", " ")
1527 line_list = list()
1528 output_table.append(line_list)
1529 cells = line.split(sep=" ")
1530 for cell in cells:
1531 cell = cell.strip()
1532 if len(cell) > 0:
1533 line_list.append(cell)
1534 return output_table
1535
1536 @staticmethod
1537 def _parse_services(output: str) -> list:
1538 lines = output.splitlines(keepends=False)
1539 services = []
1540 for line in lines:
1541 line = line.replace("\t", " ")
1542 cells = line.split(sep=" ")
1543 if len(cells) > 0 and cells[0].startswith("service/"):
1544 elems = cells[0].split(sep="/")
1545 if len(elems) > 1:
1546 services.append(elems[1])
1547 return services
1548
1549 @staticmethod
1550 def _get_deep(dictionary: dict, members: tuple):
1551 target = dictionary
1552 value = None
1553 try:
1554 for m in members:
1555 value = target.get(m)
1556 if not value:
1557 return None
1558 else:
1559 target = value
1560 except Exception:
1561 pass
1562 return value
1563
1564 # find key:value in several lines
1565 @staticmethod
1566 def _find_in_lines(p_lines: list, p_key: str) -> str:
1567 for line in p_lines:
1568 try:
1569 if line.startswith(p_key + ":"):
1570 parts = line.split(":")
1571 the_value = parts[1].strip()
1572 return the_value
1573 except Exception:
1574 # ignore it
1575 pass
1576 return None
1577
1578 @staticmethod
1579 def _lower_keys_list(input_list: list):
1580 """
1581 Transform the keys in a list of dictionaries to lower case and returns a new list
1582 of dictionaries
1583 """
1584 new_list = []
1585 if input_list:
1586 for dictionary in input_list:
1587 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1588 new_list.append(new_dict)
1589 return new_list
1590
1591 async def _local_async_exec(
1592 self,
1593 command: str,
1594 raise_exception_on_error: bool = False,
1595 show_error_log: bool = True,
1596 encode_utf8: bool = False,
1597 env: dict = None,
1598 ) -> tuple[str, int]:
1599 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1600 self.log.debug(
1601 "Executing async local command: {}, env: {}".format(command, env)
1602 )
1603
1604 # split command
1605 command = shlex.split(command)
1606
1607 environ = os.environ.copy()
1608 if env:
1609 environ.update(env)
1610
1611 try:
1612 async with self.cmd_lock:
1613 process = await asyncio.create_subprocess_exec(
1614 *command,
1615 stdout=asyncio.subprocess.PIPE,
1616 stderr=asyncio.subprocess.PIPE,
1617 env=environ,
1618 )
1619
1620 # wait for command terminate
1621 stdout, stderr = await process.communicate()
1622
1623 return_code = process.returncode
1624
1625 output = ""
1626 if stdout:
1627 output = stdout.decode("utf-8").strip()
1628 # output = stdout.decode()
1629 if stderr:
1630 output = stderr.decode("utf-8").strip()
1631 # output = stderr.decode()
1632
1633 if return_code != 0 and show_error_log:
1634 self.log.debug(
1635 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1636 )
1637 else:
1638 self.log.debug("Return code: {}".format(return_code))
1639
1640 if raise_exception_on_error and return_code != 0:
1641 raise K8sException(output)
1642
1643 if encode_utf8:
1644 output = output.encode("utf-8").strip()
1645 output = str(output).replace("\\n", "\n")
1646
1647 return output, return_code
1648
1649 except asyncio.CancelledError:
1650 # first, kill the process if it is still running
1651 if process.returncode is None:
1652 process.kill()
1653 raise
1654 except K8sException:
1655 raise
1656 except Exception as e:
1657 msg = "Exception executing command: {} -> {}".format(command, e)
1658 self.log.error(msg)
1659 if raise_exception_on_error:
1660 raise K8sException(e) from e
1661 else:
1662 return "", -1
1663
1664 async def _local_async_exec_pipe(
1665 self,
1666 command1: str,
1667 command2: str,
1668 raise_exception_on_error: bool = True,
1669 show_error_log: bool = True,
1670 encode_utf8: bool = False,
1671 env: dict = None,
1672 ):
1673 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1674 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1675 command = "{} | {}".format(command1, command2)
1676 self.log.debug(
1677 "Executing async local command: {}, env: {}".format(command, env)
1678 )
1679
1680 # split command
1681 command1 = shlex.split(command1)
1682 command2 = shlex.split(command2)
1683
1684 environ = os.environ.copy()
1685 if env:
1686 environ.update(env)
1687
1688 try:
1689 async with self.cmd_lock:
1690 read, write = os.pipe()
1691 process_1 = await asyncio.create_subprocess_exec(
1692 *command1, stdout=write, env=environ
1693 )
1694 os.close(write)
1695 process_2 = await asyncio.create_subprocess_exec(
1696 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1697 )
1698 os.close(read)
1699 stdout, stderr = await process_2.communicate()
1700
1701 return_code = process_2.returncode
1702
1703 output = ""
1704 if stdout:
1705 output = stdout.decode("utf-8").strip()
1706 # output = stdout.decode()
1707 if stderr:
1708 output = stderr.decode("utf-8").strip()
1709 # output = stderr.decode()
1710
1711 if return_code != 0 and show_error_log:
1712 self.log.debug(
1713 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1714 )
1715 else:
1716 self.log.debug("Return code: {}".format(return_code))
1717
1718 if raise_exception_on_error and return_code != 0:
1719 raise K8sException(output)
1720
1721 if encode_utf8:
1722 output = output.encode("utf-8").strip()
1723 output = str(output).replace("\\n", "\n")
1724
1725 return output, return_code
1726 except asyncio.CancelledError:
1727 # first, kill the processes if they are still running
1728 for process in (process_1, process_2):
1729 if process.returncode is None:
1730 process.kill()
1731 raise
1732 except K8sException:
1733 raise
1734 except Exception as e:
1735 msg = "Exception executing command: {} -> {}".format(command, e)
1736 self.log.error(msg)
1737 if raise_exception_on_error:
1738 raise K8sException(e) from e
1739 else:
1740 return "", -1
1741
1742 async def _get_service(self, cluster_id, service_name, namespace):
1743 """
1744 Obtains the data of the specified service in the k8cluster.
1745
1746 :param cluster_id: id of a K8s cluster known by OSM
1747 :param service_name: name of the K8s service in the specified namespace
1748 :param namespace: K8s namespace used by the KDU instance
1749 :return: If successful, it will return a service with the following data:
1750 - `name` of the service
1751 - `type` type of service in the k8 cluster
1752 - `ports` List of ports offered by the service, for each port includes at least
1753 name, port, protocol
1754 - `cluster_ip` Internal ip to be used inside k8s cluster
1755 - `external_ip` List of external ips (in case they are available)
1756 """
1757
1758 # init config, env
1759 paths, env = self._init_paths_env(
1760 cluster_name=cluster_id, create_if_not_exist=True
1761 )
1762
1763 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1764 self.kubectl_command,
1765 paths["kube_config"],
1766 quote(namespace),
1767 quote(service_name),
1768 )
1769
1770 output, _rc = await self._local_async_exec(
1771 command=command, raise_exception_on_error=True, env=env
1772 )
1773
1774 data = yaml.load(output, Loader=yaml.SafeLoader)
1775
1776 service = {
1777 "name": service_name,
1778 "type": self._get_deep(data, ("spec", "type")),
1779 "ports": self._get_deep(data, ("spec", "ports")),
1780 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1781 }
1782 if service["type"] == "LoadBalancer":
1783 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1784 ip_list = [elem["ip"] for elem in ip_map_list]
1785 service["external_ip"] = ip_list
1786
1787 return service
1788
1789 async def _exec_get_command(
1790 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1791 ):
1792 """Obtains information about the kdu instance."""
1793
1794 full_command = self._get_get_command(
1795 get_command, kdu_instance, namespace, kubeconfig
1796 )
1797
1798 output, _rc = await self._local_async_exec(command=full_command)
1799
1800 return output
1801
1802 async def _exec_inspect_command(
1803 self, inspect_command: str, kdu_model: str, repo_url: str = None
1804 ):
1805 """Obtains information about an Helm Chart package (´helm show´ command)
1806
1807 Args:
1808 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1809 kdu_model: The name or path of an Helm Chart
1810 repo_url: Helm Chart repository url
1811
1812 Returns:
1813 str: the requested info about the Helm Chart package
1814 """
1815
1816 repo_str = ""
1817 if repo_url:
1818 repo_str = " --repo {}".format(quote(repo_url))
1819
1820 # Obtain the Chart's name and store it in the var kdu_model
1821 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1822
1823 kdu_model, version = self._split_version(kdu_model)
1824 if version:
1825 version_str = "--version {}".format(quote(version))
1826 else:
1827 version_str = ""
1828
1829 full_command = self._get_inspect_command(
1830 show_command=inspect_command,
1831 kdu_model=quote(kdu_model),
1832 repo_str=repo_str,
1833 version=version_str,
1834 )
1835
1836 output, _ = await self._local_async_exec(command=full_command)
1837
1838 return output
1839
1840 async def _get_replica_count_url(
1841 self,
1842 kdu_model: str,
1843 repo_url: str = None,
1844 resource_name: str = None,
1845 ) -> tuple[int, str]:
1846 """Get the replica count value in the Helm Chart Values.
1847
1848 Args:
1849 kdu_model: The name or path of an Helm Chart
1850 repo_url: Helm Chart repository url
1851 resource_name: Resource name
1852
1853 Returns:
1854 A tuple with:
1855 - The number of replicas of the specific instance; if not found, returns None; and
1856 - The string corresponding to the replica count key in the Helm values
1857 """
1858
1859 kdu_values = yaml.load(
1860 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1861 Loader=yaml.SafeLoader,
1862 )
1863
1864 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1865
1866 if not kdu_values:
1867 raise K8sException(
1868 "kdu_values not found for kdu_model {}".format(kdu_model)
1869 )
1870
1871 if resource_name:
1872 kdu_values = kdu_values.get(resource_name, None)
1873
1874 if not kdu_values:
1875 msg = "resource {} not found in the values in model {}".format(
1876 resource_name, kdu_model
1877 )
1878 self.log.error(msg)
1879 raise K8sException(msg)
1880
1881 duplicate_check = False
1882
1883 replica_str = ""
1884 replicas = None
1885
1886 if kdu_values.get("replicaCount") is not None:
1887 replicas = kdu_values["replicaCount"]
1888 replica_str = "replicaCount"
1889 elif kdu_values.get("replicas") is not None:
1890 duplicate_check = True
1891 replicas = kdu_values["replicas"]
1892 replica_str = "replicas"
1893 else:
1894 if resource_name:
1895 msg = (
1896 "replicaCount or replicas not found in the resource"
1897 "{} values in model {}. Cannot be scaled".format(
1898 resource_name, kdu_model
1899 )
1900 )
1901 else:
1902 msg = (
1903 "replicaCount or replicas not found in the values"
1904 "in model {}. Cannot be scaled".format(kdu_model)
1905 )
1906 self.log.error(msg)
1907 raise K8sException(msg)
1908
1909 # Control if replicas and replicaCount exists at the same time
1910 msg = "replicaCount and replicas are exists at the same time"
1911 if duplicate_check:
1912 if "replicaCount" in kdu_values:
1913 self.log.error(msg)
1914 raise K8sException(msg)
1915 else:
1916 if "replicas" in kdu_values:
1917 self.log.error(msg)
1918 raise K8sException(msg)
1919
1920 return replicas, replica_str
1921
1922 async def _get_replica_count_instance(
1923 self,
1924 kdu_instance: str,
1925 namespace: str,
1926 kubeconfig: str,
1927 resource_name: str = None,
1928 ) -> int:
1929 """Get the replica count value in the instance.
1930
1931 Args:
1932 kdu_instance: The name of the KDU instance
1933 namespace: KDU instance namespace
1934 kubeconfig:
1935 resource_name: Resource name
1936
1937 Returns:
1938 The number of replicas of the specific instance; if not found, returns None
1939 """
1940
1941 kdu_values = yaml.load(
1942 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1943 Loader=yaml.SafeLoader,
1944 )
1945
1946 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1947
1948 replicas = None
1949
1950 if kdu_values:
1951 resource_values = (
1952 kdu_values.get(resource_name, None) if resource_name else None
1953 )
1954
1955 for replica_str in ("replicaCount", "replicas"):
1956 if resource_values:
1957 replicas = resource_values.get(replica_str)
1958 else:
1959 replicas = kdu_values.get(replica_str)
1960
1961 if replicas is not None:
1962 break
1963
1964 return replicas
1965
1966 async def _labels_dict(self, db_dict, kdu_instance):
1967 # get the network service registry
1968 ns_id = db_dict["filter"]["_id"]
1969 try:
1970 db_nsr = self.db.get_one("nsrs", {"_id": ns_id})
1971 except Exception as e:
1972 print("nsr {} not found: {}".format(ns_id, e))
1973 nsd_id = db_nsr["nsd"]["_id"]
1974
1975 # get the kdu registry
1976 for index, kdu in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
1977 if kdu["kdu-instance"] == kdu_instance:
1978 db_kdur = kdu
1979 break
1980 else:
1981 # No kdur found, could be the case of an EE chart
1982 return {}
1983
1984 kdu_name = db_kdur["kdu-name"]
1985 member_vnf_index = db_kdur["member-vnf-index"]
1986 # get the vnf registry
1987 try:
1988 db_vnfr = self.db.get_one(
1989 "vnfrs",
1990 {"nsr-id-ref": ns_id, "member-vnf-index-ref": member_vnf_index},
1991 )
1992 except Exception as e:
1993 print("vnfr {} not found: {}".format(member_vnf_index, e))
1994
1995 vnf_id = db_vnfr["_id"]
1996 vnfd_id = db_vnfr["vnfd-id"]
1997
1998 return {
1999 "managed-by": "osm.etsi.org",
2000 "osm.etsi.org/ns-id": ns_id,
2001 "osm.etsi.org/nsd-id": nsd_id,
2002 "osm.etsi.org/vnf-id": vnf_id,
2003 "osm.etsi.org/vnfd-id": vnfd_id,
2004 "osm.etsi.org/kdu-id": kdu_instance,
2005 "osm.etsi.org/kdu-name": kdu_name,
2006 }
2007
2008 async def _contains_labels(self, kdu_instance, namespace, kube_config, env):
2009 command = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
2010 kube_config,
2011 self._helm_command,
2012 quote(kdu_instance),
2013 quote(namespace),
2014 )
2015 output, rc = await self._local_async_exec(
2016 command=command, raise_exception_on_error=False, env=env
2017 )
2018 manifests = yaml.safe_load_all(output)
2019 for manifest in manifests:
2020 # Check if the manifest has metadata and labels
2021 if (
2022 manifest is not None
2023 and "metadata" in manifest
2024 and "labels" in manifest["metadata"]
2025 ):
2026 labels = {
2027 "managed-by",
2028 "osm.etsi.org/kdu-id",
2029 "osm.etsi.org/kdu-name",
2030 "osm.etsi.org/ns-id",
2031 "osm.etsi.org/nsd-id",
2032 "osm.etsi.org/vnf-id",
2033 "osm.etsi.org/vnfd-id",
2034 }
2035 if labels.issubset(manifest["metadata"]["labels"].keys()):
2036 return True
2037 return False
2038
2039 async def _store_status(
2040 self,
2041 cluster_id: str,
2042 operation: str,
2043 kdu_instance: str,
2044 namespace: str = None,
2045 db_dict: dict = None,
2046 ) -> None:
2047 """
2048 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
2049
2050 :param cluster_id (str): the cluster where the KDU instance is deployed
2051 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
2052 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
2053 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
2054 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
2055 values for the keys:
2056 - "collection": The Mongo DB collection to write to
2057 - "filter": The query filter to use in the update process
2058 - "path": The dot separated keys which targets the object to be updated
2059 Defaults to None.
2060 """
2061
2062 try:
2063 detailed_status = await self._status_kdu(
2064 cluster_id=cluster_id,
2065 kdu_instance=kdu_instance,
2066 yaml_format=False,
2067 namespace=namespace,
2068 )
2069
2070 status = detailed_status.get("info").get("description")
2071 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
2072
2073 # write status to db
2074 result = await self.write_app_status_to_db(
2075 db_dict=db_dict,
2076 status=str(status),
2077 detailed_status=str(detailed_status),
2078 operation=operation,
2079 )
2080
2081 if not result:
2082 self.log.info("Error writing in database. Task exiting...")
2083
2084 except asyncio.CancelledError as e:
2085 self.log.warning(
2086 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
2087 )
2088 except Exception as e:
2089 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
2090
2091 # params for use in -f file
2092 # returns values file option and filename (in order to delete it at the end)
2093 def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
2094 if params and len(params) > 0:
2095 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
2096
2097 def get_random_number():
2098 r = random.SystemRandom().randint(1, 99999999)
2099 s = str(r)
2100 while len(s) < 10:
2101 s = "0" + s
2102 return s
2103
2104 params2 = dict()
2105 for key in params:
2106 value = params.get(key)
2107 if "!!yaml" in str(value):
2108 value = yaml.safe_load(value[7:])
2109 params2[key] = value
2110
2111 values_file = get_random_number() + ".yaml"
2112 with open(values_file, "w") as stream:
2113 yaml.dump(params2, stream, indent=4, default_flow_style=False)
2114
2115 return "-f {}".format(values_file), values_file
2116
2117 return "", None
2118
2119 # params for use in --set option
2120 @staticmethod
2121 def _params_to_set_option(params: dict) -> str:
2122 pairs = [
2123 f"{quote(str(key))}={quote(str(value))}"
2124 for key, value in params.items()
2125 if value is not None
2126 ]
2127 if not pairs:
2128 return ""
2129 return "--set " + ",".join(pairs)
2130
2131 @staticmethod
2132 def generate_kdu_instance_name(**kwargs):
2133 chart_name = kwargs["kdu_model"]
2134 # check embeded chart (file or dir)
2135 if chart_name.startswith("/"):
2136 # extract file or directory name
2137 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2138 # check URL
2139 elif "://" in chart_name:
2140 # extract last portion of URL
2141 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2142
2143 name = ""
2144 for c in chart_name:
2145 if c.isalpha() or c.isnumeric():
2146 name += c
2147 else:
2148 name += "-"
2149 if len(name) > 35:
2150 name = name[0:35]
2151
2152 # if does not start with alpha character, prefix 'a'
2153 if not name[0].isalpha():
2154 name = "a" + name
2155
2156 name += "-"
2157
2158 def get_random_number():
2159 r = random.SystemRandom().randint(1, 99999999)
2160 s = str(r)
2161 s = s.rjust(10, "0")
2162 return s
2163
2164 name = name + get_random_number()
2165 return name.lower()
2166
2167 def _split_version(self, kdu_model: str) -> tuple[str, str]:
2168 version = None
2169 if (
2170 not (
2171 self._is_helm_chart_a_file(kdu_model)
2172 or self._is_helm_chart_a_url(kdu_model)
2173 )
2174 and ":" in kdu_model
2175 ):
2176 parts = kdu_model.split(sep=":")
2177 if len(parts) == 2:
2178 version = str(parts[1])
2179 kdu_model = parts[0]
2180 return kdu_model, version
2181
2182 def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2183 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2184
2185 Args:
2186 kdu_model (str): Associated KDU model
2187
2188 Returns:
2189 (str, str): Tuple with the Chart name in index 0, and the repo name
2190 in index 2; if there was a problem finding them, return None
2191 for both
2192 """
2193
2194 chart_name = None
2195 repo_name = None
2196
2197 idx = kdu_model.find("/")
2198 if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
2199 chart_name = kdu_model[idx + 1 :]
2200 repo_name = kdu_model[:idx]
2201
2202 return chart_name, repo_name
2203
2204 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2205 """Obtain the Helm repository for an Helm Chart
2206
2207 Args:
2208 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2209 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2210
2211 Returns:
2212 str: the repository URL; if Helm Chart is a local one, the function returns None
2213 """
2214
2215 _, repo_name = self._split_repo(kdu_model=kdu_model)
2216
2217 repo_url = None
2218 if repo_name:
2219 # Find repository link
2220 local_repo_list = await self.repo_list(cluster_uuid)
2221 for repo in local_repo_list:
2222 if repo["name"] == repo_name:
2223 repo_url = repo["url"]
2224 break # it is not necessary to continue the loop if the repo link was found...
2225
2226 return repo_url
2227
2228 def _repo_to_oci_url(self, repo):
2229 db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
2230 if db_repo and "oci" in db_repo:
2231 return db_repo.get("url")
2232
2233 async def _prepare_helm_chart(self, kdu_model, cluster_id):
2234 # e.g.: "stable/openldap", "1.0"
2235 kdu_model, version = self._split_version(kdu_model)
2236 # e.g.: "openldap, stable"
2237 chart_name, repo = self._split_repo(kdu_model)
2238 if repo and chart_name: # repo/chart case
2239 oci_url = self._repo_to_oci_url(repo)
2240 if oci_url: # oci does not require helm repo update
2241 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
2242 else:
2243 await self.repo_update(cluster_id, repo)
2244 return kdu_model, version
2245
2246 async def create_certificate(
2247 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2248 ):
2249 paths, env = self._init_paths_env(
2250 cluster_name=cluster_uuid, create_if_not_exist=True
2251 )
2252 kubectl = Kubectl(config_file=paths["kube_config"])
2253 await kubectl.create_certificate(
2254 namespace=namespace,
2255 name=name,
2256 dns_prefix=dns_prefix,
2257 secret_name=secret_name,
2258 usages=[usage],
2259 issuer_name="ca-issuer",
2260 )
2261
2262 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2263 paths, env = self._init_paths_env(
2264 cluster_name=cluster_uuid, create_if_not_exist=True
2265 )
2266 kubectl = Kubectl(config_file=paths["kube_config"])
2267 await kubectl.delete_certificate(namespace, certificate_name)
2268
2269 async def create_namespace(
2270 self,
2271 namespace,
2272 cluster_uuid,
2273 labels,
2274 ):
2275 """
2276 Create a namespace in a specific cluster
2277
2278 :param namespace: Namespace to be created
2279 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2280 :param labels: Dictionary with labels for the new namespace
2281 :returns: None
2282 """
2283 paths, env = self._init_paths_env(
2284 cluster_name=cluster_uuid, create_if_not_exist=True
2285 )
2286 kubectl = Kubectl(config_file=paths["kube_config"])
2287 await kubectl.create_namespace(
2288 name=namespace,
2289 labels=labels,
2290 )
2291
2292 async def delete_namespace(
2293 self,
2294 namespace,
2295 cluster_uuid,
2296 ):
2297 """
2298 Delete a namespace in a specific cluster
2299
2300 :param namespace: namespace to be deleted
2301 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2302 :returns: None
2303 """
2304 paths, env = self._init_paths_env(
2305 cluster_name=cluster_uuid, create_if_not_exist=True
2306 )
2307 kubectl = Kubectl(config_file=paths["kube_config"])
2308 await kubectl.delete_namespace(
2309 name=namespace,
2310 )
2311
2312 async def copy_secret_data(
2313 self,
2314 src_secret: str,
2315 dst_secret: str,
2316 cluster_uuid: str,
2317 data_key: str,
2318 src_namespace: str = "osm",
2319 dst_namespace: str = "osm",
2320 ):
2321 """
2322 Copy a single key and value from an existing secret to a new one
2323
2324 :param src_secret: name of the existing secret
2325 :param dst_secret: name of the new secret
2326 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2327 :param data_key: key of the existing secret to be copied
2328 :param src_namespace: Namespace of the existing secret
2329 :param dst_namespace: Namespace of the new secret
2330 :returns: None
2331 """
2332 paths, env = self._init_paths_env(
2333 cluster_name=cluster_uuid, create_if_not_exist=True
2334 )
2335 kubectl = Kubectl(config_file=paths["kube_config"])
2336 secret_data = await kubectl.get_secret_content(
2337 name=src_secret,
2338 namespace=src_namespace,
2339 )
2340 # Only the corresponding data_key value needs to be copy
2341 data = {data_key: secret_data.get(data_key)}
2342 await kubectl.create_secret(
2343 name=dst_secret,
2344 data=data,
2345 namespace=dst_namespace,
2346 secret_type="Opaque",
2347 )
2348
2349 async def setup_default_rbac(
2350 self,
2351 name,
2352 namespace,
2353 cluster_uuid,
2354 api_groups,
2355 resources,
2356 verbs,
2357 service_account,
2358 ):
2359 """
2360 Create a basic RBAC for a new namespace.
2361
2362 :param name: name of both Role and Role Binding
2363 :param namespace: K8s namespace
2364 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2365 :param api_groups: Api groups to be allowed in Policy Rule
2366 :param resources: Resources to be allowed in Policy Rule
2367 :param verbs: Verbs to be allowed in Policy Rule
2368 :param service_account: Service Account name used to bind the Role
2369 :returns: None
2370 """
2371 paths, env = self._init_paths_env(
2372 cluster_name=cluster_uuid, create_if_not_exist=True
2373 )
2374 kubectl = Kubectl(config_file=paths["kube_config"])
2375 await kubectl.create_role(
2376 name=name,
2377 labels={},
2378 namespace=namespace,
2379 api_groups=api_groups,
2380 resources=resources,
2381 verbs=verbs,
2382 )
2383 await kubectl.create_role_binding(
2384 name=name,
2385 labels={},
2386 namespace=namespace,
2387 role_name=name,
2388 sa_name=service_account,
2389 )