Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_base_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_helm_base_conn.py
100%
1/1
58%
447/772
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_base_conn.py
58%
447/772
N/A

Source

n2vc/k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import abc
23 1 import asyncio
24 1 from typing import Union
25 1 from shlex import quote
26 1 import random
27 1 import time
28 1 import shlex
29 1 import shutil
30 1 import stat
31 1 import os
32 1 import yaml
33 1 from uuid import uuid4
34 1 from urllib.parse import urlparse
35
36 1 from n2vc.config import EnvironConfig
37 1 from n2vc.exceptions import K8sException
38 1 from n2vc.k8s_conn import K8sConnector
39 1 from n2vc.kubectl import Kubectl
40
41
42 1 class K8sHelmBaseConnector(K8sConnector):
43
44     """
45     ####################################################################################
46     ################################### P U B L I C ####################################
47     ####################################################################################
48     """
49
50 1     service_account = "osm"
51
52 1     def __init__(
53         self,
54         fs: object,
55         db: object,
56         kubectl_command: str = "/usr/bin/kubectl",
57         helm_command: str = "/usr/bin/helm",
58         log: object = None,
59         on_update_db=None,
60     ):
61         """
62
63         :param fs: file system for kubernetes and helm configuration
64         :param db: database object to write current operation status
65         :param kubectl_command: path to kubectl executable
66         :param helm_command: path to helm executable
67         :param log: logger
68         :param on_update_db: callback called when k8s connector updates database
69         """
70
71         # parent class
72 1         K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
73
74 1         self.log.info("Initializing K8S Helm connector")
75
76 1         self.config = EnvironConfig()
77         # random numbers for release name generation
78 1         random.seed(time.time())
79
80         # the file system
81 1         self.fs = fs
82
83         # exception if kubectl is not installed
84 1         self.kubectl_command = kubectl_command
85 1         self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
86
87         # exception if helm is not installed
88 1         self._helm_command = helm_command
89 1         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
90
91         # obtain stable repo url from config or apply default
92 1         self._stable_repo_url = self.config.get("stablerepourl")
93 1         if self._stable_repo_url == "None":
94 0             self._stable_repo_url = None
95
96         # Lock to avoid concurrent execution of helm commands
97 1         self.cmd_lock = asyncio.Lock()
98
99 1     def _get_namespace(self, cluster_uuid: str) -> str:
100         """
101         Obtains the namespace used by the cluster with the uuid passed by argument
102
103         param: cluster_uuid: cluster's uuid
104         """
105
106         # first, obtain the cluster corresponding to the uuid passed by argument
107 1         k8scluster = self.db.get_one(
108             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
109         )
110 1         return k8scluster.get("namespace")
111
112 1     async def init_env(
113         self,
114         k8s_creds: str,
115         namespace: str = "kube-system",
116         reuse_cluster_uuid=None,
117         **kwargs,
118     ) -> tuple[str, bool]:
119         """
120         It prepares a given K8s cluster environment to run Charts
121
122         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
123             '.kube/config'
124         :param namespace: optional namespace to be used for helm. By default,
125             'kube-system' will be used
126         :param reuse_cluster_uuid: existing cluster uuid for reuse
127         :param kwargs: Additional parameters (None yet)
128         :return: uuid of the K8s cluster and True if connector has installed some
129             software in the cluster
130         (on error, an exception will be raised)
131         """
132
133 1         if reuse_cluster_uuid:
134 1             cluster_id = reuse_cluster_uuid
135         else:
136 0             cluster_id = str(uuid4())
137
138 1         self.log.debug(
139             "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
140         )
141
142 1         paths, env = self._init_paths_env(
143             cluster_name=cluster_id, create_if_not_exist=True
144         )
145 1         mode = stat.S_IRUSR | stat.S_IWUSR
146 1         with open(paths["kube_config"], "w", mode) as f:
147 1             f.write(k8s_creds)
148 1         os.chmod(paths["kube_config"], 0o600)
149
150         # Code with initialization specific of helm version
151 1         n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
152
153         # sync fs with local data
154 1         self.fs.reverse_sync(from_path=cluster_id)
155
156 1         self.log.info("Cluster {} initialized".format(cluster_id))
157
158 1         return cluster_id, n2vc_installed_sw
159
160 1     async def repo_add(
161         self,
162         cluster_uuid: str,
163         name: str,
164         url: str,
165         repo_type: str = "chart",
166         cert: str = None,
167         user: str = None,
168         password: str = None,
169         oci: bool = False,
170     ):
171 1         self.log.debug(
172             "Cluster {}, adding {} repository {}. URL: {}".format(
173                 cluster_uuid, repo_type, name, url
174             )
175         )
176
177         # init_env
178 1         paths, env = self._init_paths_env(
179             cluster_name=cluster_uuid, create_if_not_exist=True
180         )
181
182         # sync local dir
183 1         self.fs.sync(from_path=cluster_uuid)
184
185 1         if oci:
186 0             if user and password:
187 0                 host_port = urlparse(url).netloc if url.startswith("oci://") else url
188                 # helm registry login url
189 0                 command = "env KUBECONFIG={} {} registry login {}".format(
190                     paths["kube_config"], self._helm_command, quote(host_port)
191                 )
192             else:
193 0                 self.log.debug(
194                     "OCI registry login is not needed for repo: {}".format(name)
195                 )
196 0                 return
197         else:
198             # helm repo add name url
199 1             command = "env KUBECONFIG={} {} repo add {} {}".format(
200                 paths["kube_config"], self._helm_command, quote(name), quote(url)
201             )
202
203 1         if cert:
204 0             temp_cert_file = os.path.join(
205                 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
206             )
207 0             os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
208 0             with open(temp_cert_file, "w") as the_cert:
209 0                 the_cert.write(cert)
210 0             command += " --ca-file {}".format(quote(temp_cert_file))
211
212 1         if user:
213 0             command += " --username={}".format(quote(user))
214
215 1         if password:
216 0             command += " --password={}".format(quote(password))
217
218 1         self.log.debug("adding repo: {}".format(command))
219 1         await self._local_async_exec(
220             command=command, raise_exception_on_error=True, env=env
221         )
222
223 1         if not oci:
224             # helm repo update
225 1             command = "env KUBECONFIG={} {} repo update {}".format(
226                 paths["kube_config"], self._helm_command, quote(name)
227             )
228 1             self.log.debug("updating repo: {}".format(command))
229 1             await self._local_async_exec(
230                 command=command, raise_exception_on_error=False, env=env
231             )
232
233         # sync fs
234 1         self.fs.reverse_sync(from_path=cluster_uuid)
235
236 1     async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
237 1         self.log.debug(
238             "Cluster {}, updating {} repository {}".format(
239                 cluster_uuid, repo_type, name
240             )
241         )
242
243         # init_env
244 1         paths, env = self._init_paths_env(
245             cluster_name=cluster_uuid, create_if_not_exist=True
246         )
247
248         # sync local dir
249 1         self.fs.sync(from_path=cluster_uuid)
250
251         # helm repo update
252 1         command = "{} repo update {}".format(self._helm_command, quote(name))
253 1         self.log.debug("updating repo: {}".format(command))
254 1         await self._local_async_exec(
255             command=command, raise_exception_on_error=False, env=env
256         )
257
258         # sync fs
259 1         self.fs.reverse_sync(from_path=cluster_uuid)
260
261 1     async def repo_list(self, cluster_uuid: str) -> list:
262         """
263         Get the list of registered repositories
264
265         :return: list of registered repositories: [ (name, url) .... ]
266         """
267
268 1         self.log.debug("list repositories for cluster {}".format(cluster_uuid))
269
270         # config filename
271 1         paths, env = self._init_paths_env(
272             cluster_name=cluster_uuid, create_if_not_exist=True
273         )
274
275         # sync local dir
276 1         self.fs.sync(from_path=cluster_uuid)
277
278 1         command = "env KUBECONFIG={} {} repo list --output yaml".format(
279             paths["kube_config"], self._helm_command
280         )
281
282         # Set exception to false because if there are no repos just want an empty list
283 1         output, _rc = await self._local_async_exec(
284             command=command, raise_exception_on_error=False, env=env
285         )
286
287         # sync fs
288 1         self.fs.reverse_sync(from_path=cluster_uuid)
289
290 1         if _rc == 0:
291 1             if output and len(output) > 0:
292 0                 repos = yaml.load(output, Loader=yaml.SafeLoader)
293                 # unify format between helm2 and helm3 setting all keys lowercase
294 0                 return self._lower_keys_list(repos)
295             else:
296 1                 return []
297         else:
298 0             return []
299
300 1     async def repo_remove(self, cluster_uuid: str, name: str):
301 1         self.log.debug(
302             "remove {} repositories for cluster {}".format(name, cluster_uuid)
303         )
304
305         # init env, paths
306 1         paths, env = self._init_paths_env(
307             cluster_name=cluster_uuid, create_if_not_exist=True
308         )
309
310         # sync local dir
311 1         self.fs.sync(from_path=cluster_uuid)
312
313 1         command = "env KUBECONFIG={} {} repo remove {}".format(
314             paths["kube_config"], self._helm_command, quote(name)
315         )
316 1         await self._local_async_exec(
317             command=command, raise_exception_on_error=True, env=env
318         )
319
320         # sync fs
321 1         self.fs.reverse_sync(from_path=cluster_uuid)
322
323 1     async def reset(
324         self,
325         cluster_uuid: str,
326         force: bool = False,
327         uninstall_sw: bool = False,
328         **kwargs,
329     ) -> bool:
330         """Reset a cluster
331
332         Resets the Kubernetes cluster by removing the helm deployment that represents it.
333
334         :param cluster_uuid: The UUID of the cluster to reset
335         :param force: Boolean to force the reset
336         :param uninstall_sw: Boolean to force the reset
337         :param kwargs: Additional parameters (None yet)
338         :return: Returns True if successful or raises an exception.
339         """
340 1         namespace = self._get_namespace(cluster_uuid=cluster_uuid)
341 1         self.log.debug(
342             "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
343                 cluster_uuid, uninstall_sw
344             )
345         )
346
347         # sync local dir
348 1         self.fs.sync(from_path=cluster_uuid)
349
350         # uninstall releases if needed.
351 1         if uninstall_sw:
352 1             releases = await self.instances_list(cluster_uuid=cluster_uuid)
353 1             if len(releases) > 0:
354 1                 if force:
355 1                     for r in releases:
356 1                         try:
357 1                             kdu_instance = r.get("name")
358 1                             chart = r.get("chart")
359 1                             self.log.debug(
360                                 "Uninstalling {} -> {}".format(chart, kdu_instance)
361                             )
362 1                             await self.uninstall(
363                                 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
364                             )
365 0                         except Exception as e:
366                             # will not raise exception as it was found
367                             # that in some cases of previously installed helm releases it
368                             # raised an error
369 0                             self.log.warn(
370                                 "Error uninstalling release {}: {}".format(
371                                     kdu_instance, e
372                                 )
373                             )
374                 else:
375 0                     msg = (
376                         "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
377                     ).format(cluster_uuid)
378 0                     self.log.warn(msg)
379 0                     uninstall_sw = (
380                         False  # Allow to remove k8s cluster without removing Tiller
381                     )
382
383 1         if uninstall_sw:
384 1             await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
385
386         # delete cluster directory
387 1         self.log.debug("Removing directory {}".format(cluster_uuid))
388 1         self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
389         # Remove also local directorio if still exist
390 1         direct = self.fs.path + "/" + cluster_uuid
391 1         shutil.rmtree(direct, ignore_errors=True)
392
393 1         return True
394
395 1     def _is_helm_chart_a_file(self, chart_name: str):
396 1         return chart_name.count("/") > 1
397
398 1     @staticmethod
399 1     def _is_helm_chart_a_url(chart_name: str):
400 1         result = urlparse(chart_name)
401 1         return all([result.scheme, result.netloc])
402
403 1     async def _install_impl(
404         self,
405         cluster_id: str,
406         kdu_model: str,
407         paths: dict,
408         env: dict,
409         kdu_instance: str,
410         atomic: bool = True,
411         timeout: float = 300,
412         params: dict = None,
413         db_dict: dict = None,
414         kdu_name: str = None,
415         namespace: str = None,
416     ):
417         # init env, paths
418 1         paths, env = self._init_paths_env(
419             cluster_name=cluster_id, create_if_not_exist=True
420         )
421
422         # params to str
423 1         params_str, file_to_delete = self._params_to_file_option(
424             cluster_id=cluster_id, params=params
425         )
426
427 1         kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
428
429 1         command = self._get_install_command(
430             kdu_model,
431             kdu_instance,
432             namespace,
433             params_str,
434             version,
435             atomic,
436             timeout,
437             paths["kube_config"],
438         )
439
440 1         self.log.debug("installing: {}".format(command))
441
442 1         if atomic:
443             # exec helm in a task
444 1             exec_task = asyncio.ensure_future(
445                 coro_or_future=self._local_async_exec(
446                     command=command, raise_exception_on_error=False, env=env
447                 )
448             )
449
450             # write status in another task
451 1             status_task = asyncio.ensure_future(
452                 coro_or_future=self._store_status(
453                     cluster_id=cluster_id,
454                     kdu_instance=kdu_instance,
455                     namespace=namespace,
456                     db_dict=db_dict,
457                     operation="install",
458                 )
459             )
460
461             # wait for execution task
462 1             await asyncio.wait([exec_task])
463
464             # cancel status task
465 1             status_task.cancel()
466
467 1             output, rc = exec_task.result()
468
469         else:
470 0             output, rc = await self._local_async_exec(
471                 command=command, raise_exception_on_error=False, env=env
472             )
473
474         # remove temporal values yaml file
475 1         if file_to_delete:
476 0             os.remove(file_to_delete)
477
478         # write final status
479 1         await self._store_status(
480             cluster_id=cluster_id,
481             kdu_instance=kdu_instance,
482             namespace=namespace,
483             db_dict=db_dict,
484             operation="install",
485         )
486
487 1         if rc != 0:
488 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
489 0             self.log.error(msg)
490 0             raise K8sException(msg)
491
492 1     async def upgrade(
493         self,
494         cluster_uuid: str,
495         kdu_instance: str,
496         kdu_model: str = None,
497         atomic: bool = True,
498         timeout: float = 300,
499         params: dict = None,
500         db_dict: dict = None,
501         namespace: str = None,
502         force: bool = False,
503     ):
504 1         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
505
506         # sync local dir
507 1         self.fs.sync(from_path=cluster_uuid)
508
509         # look for instance to obtain namespace
510
511         # set namespace
512 1         if not namespace:
513 1             instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
514 1             if not instance_info:
515 0                 raise K8sException("kdu_instance {} not found".format(kdu_instance))
516 1             namespace = instance_info["namespace"]
517
518         # init env, paths
519 1         paths, env = self._init_paths_env(
520             cluster_name=cluster_uuid, create_if_not_exist=True
521         )
522
523         # sync local dir
524 1         self.fs.sync(from_path=cluster_uuid)
525
526         # params to str
527 1         params_str, file_to_delete = self._params_to_file_option(
528             cluster_id=cluster_uuid, params=params
529         )
530
531 1         kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
532
533 1         command = self._get_upgrade_command(
534             kdu_model,
535             kdu_instance,
536             namespace,
537             params_str,
538             version,
539             atomic,
540             timeout,
541             paths["kube_config"],
542             force,
543         )
544
545 1         self.log.debug("upgrading: {}".format(command))
546
547 1         if atomic:
548             # exec helm in a task
549 1             exec_task = asyncio.ensure_future(
550                 coro_or_future=self._local_async_exec(
551                     command=command, raise_exception_on_error=False, env=env
552                 )
553             )
554             # write status in another task
555 1             status_task = asyncio.ensure_future(
556                 coro_or_future=self._store_status(
557                     cluster_id=cluster_uuid,
558                     kdu_instance=kdu_instance,
559                     namespace=namespace,
560                     db_dict=db_dict,
561                     operation="upgrade",
562                 )
563             )
564
565             # wait for execution task
566 1             await asyncio.wait([exec_task])
567
568             # cancel status task
569 1             status_task.cancel()
570 1             output, rc = exec_task.result()
571
572         else:
573 0             output, rc = await self._local_async_exec(
574                 command=command, raise_exception_on_error=False, env=env
575             )
576
577         # remove temporal values yaml file
578 1         if file_to_delete:
579 0             os.remove(file_to_delete)
580
581         # write final status
582 1         await self._store_status(
583             cluster_id=cluster_uuid,
584             kdu_instance=kdu_instance,
585             namespace=namespace,
586             db_dict=db_dict,
587             operation="upgrade",
588         )
589
590 1         if rc != 0:
591 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
592 0             self.log.error(msg)
593 0             raise K8sException(msg)
594
595         # sync fs
596 1         self.fs.reverse_sync(from_path=cluster_uuid)
597
598         # return new revision number
599 1         instance = await self.get_instance_info(
600             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
601         )
602 1         if instance:
603 1             revision = int(instance.get("revision"))
604 1             self.log.debug("New revision: {}".format(revision))
605 1             return revision
606         else:
607 0             return 0
608
609 1     async def scale(
610         self,
611         kdu_instance: str,
612         scale: int,
613         resource_name: str,
614         total_timeout: float = 1800,
615         cluster_uuid: str = None,
616         kdu_model: str = None,
617         atomic: bool = True,
618         db_dict: dict = None,
619         **kwargs,
620     ):
621         """Scale a resource in a Helm Chart.
622
623         Args:
624             kdu_instance: KDU instance name
625             scale: Scale to which to set the resource
626             resource_name: Resource name
627             total_timeout: The time, in seconds, to wait
628             cluster_uuid: The UUID of the cluster
629             kdu_model: The chart reference
630             atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
631                 The --wait flag will be set automatically if --atomic is used
632             db_dict: Dictionary for any additional data
633             kwargs: Additional parameters
634
635         Returns:
636             True if successful, False otherwise
637         """
638
639 1         debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
640 1         if resource_name:
641 1             debug_mgs = "scaling resource {} in model {} (cluster {})".format(
642                 resource_name, kdu_model, cluster_uuid
643             )
644
645 1         self.log.debug(debug_mgs)
646
647         # look for instance to obtain namespace
648         # get_instance_info function calls the sync command
649 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
650 1         if not instance_info:
651 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
652
653         # init env, paths
654 1         paths, env = self._init_paths_env(
655             cluster_name=cluster_uuid, create_if_not_exist=True
656         )
657
658         # version
659 1         kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
660
661 1         repo_url = await self._find_repo(kdu_model, cluster_uuid)
662
663 1         _, replica_str = await self._get_replica_count_url(
664             kdu_model, repo_url, resource_name
665         )
666
667 1         command = self._get_upgrade_scale_command(
668             kdu_model,
669             kdu_instance,
670             instance_info["namespace"],
671             scale,
672             version,
673             atomic,
674             replica_str,
675             total_timeout,
676             resource_name,
677             paths["kube_config"],
678         )
679
680 1         self.log.debug("scaling: {}".format(command))
681
682 1         if atomic:
683             # exec helm in a task
684 1             exec_task = asyncio.ensure_future(
685                 coro_or_future=self._local_async_exec(
686                     command=command, raise_exception_on_error=False, env=env
687                 )
688             )
689             # write status in another task
690 1             status_task = asyncio.ensure_future(
691                 coro_or_future=self._store_status(
692                     cluster_id=cluster_uuid,
693                     kdu_instance=kdu_instance,
694                     namespace=instance_info["namespace"],
695                     db_dict=db_dict,
696                     operation="scale",
697                 )
698             )
699
700             # wait for execution task
701 1             await asyncio.wait([exec_task])
702
703             # cancel status task
704 1             status_task.cancel()
705 1             output, rc = exec_task.result()
706
707         else:
708 0             output, rc = await self._local_async_exec(
709                 command=command, raise_exception_on_error=False, env=env
710             )
711
712         # write final status
713 1         await self._store_status(
714             cluster_id=cluster_uuid,
715             kdu_instance=kdu_instance,
716             namespace=instance_info["namespace"],
717             db_dict=db_dict,
718             operation="scale",
719         )
720
721 1         if rc != 0:
722 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
723 0             self.log.error(msg)
724 0             raise K8sException(msg)
725
726         # sync fs
727 1         self.fs.reverse_sync(from_path=cluster_uuid)
728
729 1         return True
730
731 1     async def get_scale_count(
732         self,
733         resource_name: str,
734         kdu_instance: str,
735         cluster_uuid: str,
736         kdu_model: str,
737         **kwargs,
738     ) -> int:
739         """Get a resource scale count.
740
741         Args:
742             cluster_uuid: The UUID of the cluster
743             resource_name: Resource name
744             kdu_instance: KDU instance name
745             kdu_model: The name or path of an Helm Chart
746             kwargs: Additional parameters
747
748         Returns:
749             Resource instance count
750         """
751
752 0         self.log.debug(
753             "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
754         )
755
756         # look for instance to obtain namespace
757 0         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
758 0         if not instance_info:
759 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
760
761         # init env, paths
762 0         paths, _ = self._init_paths_env(
763             cluster_name=cluster_uuid, create_if_not_exist=True
764         )
765
766 0         replicas = await self._get_replica_count_instance(
767             kdu_instance=kdu_instance,
768             namespace=instance_info["namespace"],
769             kubeconfig=paths["kube_config"],
770             resource_name=resource_name,
771         )
772
773 0         self.log.debug(
774             f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
775         )
776
777         # Get default value if scale count is not found from provided values
778         # Important note: this piece of code shall only be executed in the first scaling operation,
779         # since it is expected that the _get_replica_count_instance is able to obtain the number of
780         # replicas when a scale operation was already conducted previously for this KDU/resource!
781 0         if replicas is None:
782 0             repo_url = await self._find_repo(
783                 kdu_model=kdu_model, cluster_uuid=cluster_uuid
784             )
785 0             replicas, _ = await self._get_replica_count_url(
786                 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
787             )
788
789 0             self.log.debug(
790                 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
791                 f"{resource_name} obtained: {replicas}"
792             )
793
794 0             if replicas is None:
795 0                 msg = "Replica count not found. Cannot be scaled"
796 0                 self.log.error(msg)
797 0                 raise K8sException(msg)
798
799 0         return int(replicas)
800
801 1     async def rollback(
802         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
803     ):
804 1         self.log.debug(
805             "rollback kdu_instance {} to revision {} from cluster {}".format(
806                 kdu_instance, revision, cluster_uuid
807             )
808         )
809
810         # sync local dir
811 1         self.fs.sync(from_path=cluster_uuid)
812
813         # look for instance to obtain namespace
814 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
815 1         if not instance_info:
816 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
817
818         # init env, paths
819 1         paths, env = self._init_paths_env(
820             cluster_name=cluster_uuid, create_if_not_exist=True
821         )
822
823         # sync local dir
824 1         self.fs.sync(from_path=cluster_uuid)
825
826 1         command = self._get_rollback_command(
827             kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
828         )
829
830 1         self.log.debug("rolling_back: {}".format(command))
831
832         # exec helm in a task
833 1         exec_task = asyncio.ensure_future(
834             coro_or_future=self._local_async_exec(
835                 command=command, raise_exception_on_error=False, env=env
836             )
837         )
838         # write status in another task
839 1         status_task = asyncio.ensure_future(
840             coro_or_future=self._store_status(
841                 cluster_id=cluster_uuid,
842                 kdu_instance=kdu_instance,
843                 namespace=instance_info["namespace"],
844                 db_dict=db_dict,
845                 operation="rollback",
846             )
847         )
848
849         # wait for execution task
850 1         await asyncio.wait([exec_task])
851
852         # cancel status task
853 1         status_task.cancel()
854
855 1         output, rc = exec_task.result()
856
857         # write final status
858 1         await self._store_status(
859             cluster_id=cluster_uuid,
860             kdu_instance=kdu_instance,
861             namespace=instance_info["namespace"],
862             db_dict=db_dict,
863             operation="rollback",
864         )
865
866 1         if rc != 0:
867 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
868 0             self.log.error(msg)
869 0             raise K8sException(msg)
870
871         # sync fs
872 1         self.fs.reverse_sync(from_path=cluster_uuid)
873
874         # return new revision number
875 1         instance = await self.get_instance_info(
876             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
877         )
878 1         if instance:
879 1             revision = int(instance.get("revision"))
880 1             self.log.debug("New revision: {}".format(revision))
881 1             return revision
882         else:
883 0             return 0
884
885 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
886         """
887         Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
888         (this call should happen after all _terminate-config-primitive_ of the VNF
889         are invoked).
890
891         :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
892         :param kdu_instance: unique name for the KDU instance to be deleted
893         :param kwargs: Additional parameters (None yet)
894         :return: True if successful
895         """
896
897 1         self.log.debug(
898             "uninstall kdu_instance {} from cluster {}".format(
899                 kdu_instance, cluster_uuid
900             )
901         )
902
903         # sync local dir
904 1         self.fs.sync(from_path=cluster_uuid)
905
906         # look for instance to obtain namespace
907 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
908 1         if not instance_info:
909 0             self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
910 0             return True
911         # init env, paths
912 1         paths, env = self._init_paths_env(
913             cluster_name=cluster_uuid, create_if_not_exist=True
914         )
915
916         # sync local dir
917 1         self.fs.sync(from_path=cluster_uuid)
918
919 1         command = self._get_uninstall_command(
920             kdu_instance, instance_info["namespace"], paths["kube_config"]
921         )
922 1         output, _rc = await self._local_async_exec(
923             command=command, raise_exception_on_error=True, env=env
924         )
925
926         # sync fs
927 1         self.fs.reverse_sync(from_path=cluster_uuid)
928
929 1         return self._output_to_table(output)
930
931 1     async def instances_list(self, cluster_uuid: str) -> list:
932         """
933         returns a list of deployed releases in a cluster
934
935         :param cluster_uuid: the 'cluster' or 'namespace:cluster'
936         :return:
937         """
938
939 1         self.log.debug("list releases for cluster {}".format(cluster_uuid))
940
941         # sync local dir
942 1         self.fs.sync(from_path=cluster_uuid)
943
944         # execute internal command
945 1         result = await self._instances_list(cluster_uuid)
946
947         # sync fs
948 1         self.fs.reverse_sync(from_path=cluster_uuid)
949
950 1         return result
951
952 1     async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
953 0         instances = await self.instances_list(cluster_uuid=cluster_uuid)
954 0         for instance in instances:
955 0             if instance.get("name") == kdu_instance:
956 0                 return instance
957 0         self.log.debug("Instance {} not found".format(kdu_instance))
958 0         return None
959
960 1     async def upgrade_charm(
961         self,
962         ee_id: str = None,
963         path: str = None,
964         charm_id: str = None,
965         charm_type: str = None,
966         timeout: float = None,
967     ) -> str:
968         """This method upgrade charms in VNFs
969
970         Args:
971             ee_id:  Execution environment id
972             path:   Local path to the charm
973             charm_id:   charm-id
974             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
975             timeout: (Float)    Timeout for the ns update operation
976
977         Returns:
978             The output of the update operation if status equals to "completed"
979         """
980 0         raise K8sException("KDUs deployed with Helm do not support charm upgrade")
981
982 1     async def exec_primitive(
983         self,
984         cluster_uuid: str = None,
985         kdu_instance: str = None,
986         primitive_name: str = None,
987         timeout: float = 300,
988         params: dict = None,
989         db_dict: dict = None,
990         **kwargs,
991     ) -> str:
992         """Exec primitive (Juju action)
993
994         :param cluster_uuid: The UUID of the cluster or namespace:cluster
995         :param kdu_instance: The unique name of the KDU instance
996         :param primitive_name: Name of action that will be executed
997         :param timeout: Timeout for action execution
998         :param params: Dictionary of all the parameters needed for the action
999         :db_dict: Dictionary for any additional data
1000         :param kwargs: Additional parameters (None yet)
1001
1002         :return: Returns the output of the action
1003         """
1004 0         raise K8sException(
1005             "KDUs deployed with Helm don't support actions "
1006             "different from rollback, upgrade and status"
1007         )
1008
1009 1     async def get_services(
1010         self, cluster_uuid: str, kdu_instance: str, namespace: str
1011     ) -> list:
1012         """
1013         Returns a list of services defined for the specified kdu instance.
1014
1015         :param cluster_uuid: UUID of a K8s cluster known by OSM
1016         :param kdu_instance: unique name for the KDU instance
1017         :param namespace: K8s namespace used by the KDU instance
1018         :return: If successful, it will return a list of services, Each service
1019         can have the following data:
1020         - `name` of the service
1021         - `type` type of service in the k8 cluster
1022         - `ports` List of ports offered by the service, for each port includes at least
1023         name, port, protocol
1024         - `cluster_ip` Internal ip to be used inside k8s cluster
1025         - `external_ip` List of external ips (in case they are available)
1026         """
1027
1028 1         self.log.debug(
1029             "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1030                 cluster_uuid, kdu_instance
1031             )
1032         )
1033
1034         # init env, paths
1035 1         paths, env = self._init_paths_env(
1036             cluster_name=cluster_uuid, create_if_not_exist=True
1037         )
1038
1039         # sync local dir
1040 1         self.fs.sync(from_path=cluster_uuid)
1041
1042         # get list of services names for kdu
1043 1         service_names = await self._get_services(
1044             cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1045         )
1046
1047 1         service_list = []
1048 1         for service in service_names:
1049 1             service = await self._get_service(cluster_uuid, service, namespace)
1050 1             service_list.append(service)
1051
1052         # sync fs
1053 1         self.fs.reverse_sync(from_path=cluster_uuid)
1054
1055 1         return service_list
1056
1057 1     async def get_service(
1058         self, cluster_uuid: str, service_name: str, namespace: str
1059     ) -> object:
1060 1         self.log.debug(
1061             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1062                 service_name, namespace, cluster_uuid
1063             )
1064         )
1065
1066         # sync local dir
1067 1         self.fs.sync(from_path=cluster_uuid)
1068
1069 1         service = await self._get_service(cluster_uuid, service_name, namespace)
1070
1071         # sync fs
1072 1         self.fs.reverse_sync(from_path=cluster_uuid)
1073
1074 1         return service
1075
1076 1     async def status_kdu(
1077         self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1078     ) -> Union[str, dict]:
1079         """
1080         This call would retrieve tha current state of a given KDU instance. It would be
1081         would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1082         values_ of the configuration parameters applied to a given instance. This call
1083         would be based on the `status` call.
1084
1085         :param cluster_uuid: UUID of a K8s cluster known by OSM
1086         :param kdu_instance: unique name for the KDU instance
1087         :param kwargs: Additional parameters (None yet)
1088         :param yaml_format: if the return shall be returned as an YAML string or as a
1089                                 dictionary
1090         :return: If successful, it will return the following vector of arguments:
1091         - K8s `namespace` in the cluster where the KDU lives
1092         - `state` of the KDU instance. It can be:
1093               - UNKNOWN
1094               - DEPLOYED
1095               - DELETED
1096               - SUPERSEDED
1097               - FAILED or
1098               - DELETING
1099         - List of `resources` (objects) that this release consists of, sorted by kind,
1100           and the status of those resources
1101         - Last `deployment_time`.
1102
1103         """
1104 0         self.log.debug(
1105             "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1106                 cluster_uuid, kdu_instance
1107             )
1108         )
1109
1110         # sync local dir
1111 0         self.fs.sync(from_path=cluster_uuid)
1112
1113         # get instance: needed to obtain namespace
1114 0         instances = await self._instances_list(cluster_id=cluster_uuid)
1115 0         for instance in instances:
1116 0             if instance.get("name") == kdu_instance:
1117 0                 break
1118         else:
1119             # instance does not exist
1120 0             raise K8sException(
1121                 "Instance name: {} not found in cluster: {}".format(
1122                     kdu_instance, cluster_uuid
1123                 )
1124             )
1125
1126 0         status = await self._status_kdu(
1127             cluster_id=cluster_uuid,
1128             kdu_instance=kdu_instance,
1129             namespace=instance["namespace"],
1130             yaml_format=yaml_format,
1131             show_error_log=True,
1132         )
1133
1134         # sync fs
1135 0         self.fs.reverse_sync(from_path=cluster_uuid)
1136
1137 0         return status
1138
1139 1     async def get_values_kdu(
1140         self, kdu_instance: str, namespace: str, kubeconfig: str
1141     ) -> str:
1142 1         self.log.debug("get kdu_instance values {}".format(kdu_instance))
1143
1144 1         return await self._exec_get_command(
1145             get_command="values",
1146             kdu_instance=kdu_instance,
1147             namespace=namespace,
1148             kubeconfig=kubeconfig,
1149         )
1150
1151 1     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1152         """Method to obtain the Helm Chart package's values
1153
1154         Args:
1155             kdu_model: The name or path of an Helm Chart
1156             repo_url: Helm Chart repository url
1157
1158         Returns:
1159             str: the values of the Helm Chart package
1160         """
1161
1162 1         self.log.debug(
1163             "inspect kdu_model values {} from (optional) repo: {}".format(
1164                 kdu_model, repo_url
1165             )
1166         )
1167
1168 1         return await self._exec_inspect_command(
1169             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1170         )
1171
1172 1     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1173 1         self.log.debug(
1174             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1175         )
1176
1177 1         return await self._exec_inspect_command(
1178             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1179         )
1180
1181 1     async def synchronize_repos(self, cluster_uuid: str):
1182 1         self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1183 1         try:
1184 1             db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1185 1             db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1186
1187 1             local_repo_list = await self.repo_list(cluster_uuid)
1188 1             local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1189
1190 1             deleted_repo_list = []
1191 1             added_repo_dict = {}
1192
1193             # iterate over the list of repos in the database that should be
1194             # added if not present
1195 1             for repo_name, db_repo in db_repo_dict.items():
1196 1                 try:
1197                     # check if it is already present
1198 1                     curr_repo_url = local_repo_dict.get(db_repo["name"])
1199 1                     repo_id = db_repo.get("_id")
1200 1                     if curr_repo_url != db_repo["url"]:
1201 1                         if curr_repo_url:
1202 0                             self.log.debug(
1203                                 "repo {} url changed, delete and and again".format(
1204                                     db_repo["url"]
1205                                 )
1206                             )
1207 0                             await self.repo_remove(cluster_uuid, db_repo["name"])
1208 0                             deleted_repo_list.append(repo_id)
1209
1210                         # add repo
1211 1                         self.log.debug("add repo {}".format(db_repo["name"]))
1212 1                         await self.repo_add(
1213                             cluster_uuid,
1214                             db_repo["name"],
1215                             db_repo["url"],
1216                             cert=db_repo.get("ca_cert"),
1217                             user=db_repo.get("user"),
1218                             password=db_repo.get("password"),
1219                             oci=db_repo.get("oci", False),
1220                         )
1221 1                         added_repo_dict[repo_id] = db_repo["name"]
1222 0                 except Exception as e:
1223 0                     raise K8sException(
1224                         "Error adding repo id: {}, err_msg: {} ".format(
1225                             repo_id, repr(e)
1226                         )
1227                     )
1228
1229             # Delete repos that are present but not in nbi_list
1230 1             for repo_name in local_repo_dict:
1231 1                 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1232 1                     self.log.debug("delete repo {}".format(repo_name))
1233 1                     try:
1234 1                         await self.repo_remove(cluster_uuid, repo_name)
1235 1                         deleted_repo_list.append(repo_name)
1236 0                     except Exception as e:
1237 0                         self.warning(
1238                             "Error deleting repo, name: {}, err_msg: {}".format(
1239                                 repo_name, str(e)
1240                             )
1241                         )
1242
1243 1             return deleted_repo_list, added_repo_dict
1244
1245 0         except K8sException:
1246 0             raise
1247 0         except Exception as e:
1248             # Do not raise errors synchronizing repos
1249 0             self.log.error("Error synchronizing repos: {}".format(e))
1250 0             raise Exception("Error synchronizing repos: {}".format(e))
1251
1252 1     def _get_db_repos_dict(self, repo_ids: list):
1253 1         db_repos_dict = {}
1254 1         for repo_id in repo_ids:
1255 1             db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1256 1             db_repos_dict[db_repo["name"]] = db_repo
1257 1         return db_repos_dict
1258
1259 1     """
1260     ####################################################################################
1261     ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1262     ####################################################################################
1263     """
1264
1265 1     @abc.abstractmethod
1266 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1267         """
1268         Creates and returns base cluster and kube dirs and returns them.
1269         Also created helm3 dirs according to new directory specification, paths are
1270         not returned but assigned to helm environment variables
1271
1272         :param cluster_name:  cluster_name
1273         :return: Dictionary with config_paths and dictionary with helm environment variables
1274         """
1275
1276 1     @abc.abstractmethod
1277 1     async def _cluster_init(self, cluster_id, namespace, paths, env):
1278         """
1279         Implements the helm version dependent cluster initialization
1280         """
1281
1282 1     @abc.abstractmethod
1283 1     async def _instances_list(self, cluster_id):
1284         """
1285         Implements the helm version dependent helm instances list
1286         """
1287
1288 1     @abc.abstractmethod
1289 1     async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1290         """
1291         Implements the helm version dependent method to obtain services from a helm instance
1292         """
1293
1294 1     @abc.abstractmethod
1295 1     async def _status_kdu(
1296         self,
1297         cluster_id: str,
1298         kdu_instance: str,
1299         namespace: str = None,
1300         yaml_format: bool = False,
1301         show_error_log: bool = False,
1302     ) -> Union[str, dict]:
1303         """
1304         Implements the helm version dependent method to obtain status of a helm instance
1305         """
1306
1307 1     @abc.abstractmethod
1308 1     def _get_install_command(
1309         self,
1310         kdu_model,
1311         kdu_instance,
1312         namespace,
1313         params_str,
1314         version,
1315         atomic,
1316         timeout,
1317         kubeconfig,
1318     ) -> str:
1319         """
1320         Obtain command to be executed to delete the indicated instance
1321         """
1322
1323 1     @abc.abstractmethod
1324 1     def _get_upgrade_scale_command(
1325         self,
1326         kdu_model,
1327         kdu_instance,
1328         namespace,
1329         count,
1330         version,
1331         atomic,
1332         replicas,
1333         timeout,
1334         resource_name,
1335         kubeconfig,
1336     ) -> str:
1337         """Generates the command to scale a Helm Chart release
1338
1339         Args:
1340             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1341             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1342             namespace (str): Namespace where this KDU instance is deployed
1343             scale (int): Scale count
1344             version (str): Constraint with specific version of the Chart to use
1345             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1346                 The --wait flag will be set automatically if --atomic is used
1347             replica_str (str): The key under resource_name key where the scale count is stored
1348             timeout (float): The time, in seconds, to wait
1349             resource_name (str): The KDU's resource to scale
1350             kubeconfig (str): Kubeconfig file path
1351
1352         Returns:
1353             str: command to scale a Helm Chart release
1354         """
1355
1356 1     @abc.abstractmethod
1357 1     def _get_upgrade_command(
1358         self,
1359         kdu_model,
1360         kdu_instance,
1361         namespace,
1362         params_str,
1363         version,
1364         atomic,
1365         timeout,
1366         kubeconfig,
1367         force,
1368     ) -> str:
1369         """Generates the command to upgrade a Helm Chart release
1370
1371         Args:
1372             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1373             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1374             namespace (str): Namespace where this KDU instance is deployed
1375             params_str (str): Params used to upgrade the Helm Chart release
1376             version (str): Constraint with specific version of the Chart to use
1377             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1378                 The --wait flag will be set automatically if --atomic is used
1379             timeout (float): The time, in seconds, to wait
1380             kubeconfig (str): Kubeconfig file path
1381             force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1382         Returns:
1383             str: command to upgrade a Helm Chart release
1384         """
1385
1386 1     @abc.abstractmethod
1387 1     def _get_rollback_command(
1388         self, kdu_instance, namespace, revision, kubeconfig
1389     ) -> str:
1390         """
1391         Obtain command to be executed to rollback the indicated instance
1392         """
1393
1394 1     @abc.abstractmethod
1395 1     def _get_uninstall_command(
1396         self, kdu_instance: str, namespace: str, kubeconfig: str
1397     ) -> str:
1398         """
1399         Obtain command to be executed to delete the indicated instance
1400         """
1401
1402 1     @abc.abstractmethod
1403 1     def _get_inspect_command(
1404         self, show_command: str, kdu_model: str, repo_str: str, version: str
1405     ):
1406         """Generates the command to obtain the information about an Helm Chart package
1407             (´helm show ...´ command)
1408
1409         Args:
1410             show_command: the second part of the command (`helm show <show_command>`)
1411             kdu_model: The name or path of an Helm Chart
1412             repo_url: Helm Chart repository url
1413             version: constraint with specific version of the Chart to use
1414
1415         Returns:
1416             str: the generated Helm Chart command
1417         """
1418
1419 1     @abc.abstractmethod
1420 1     def _get_get_command(
1421         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1422     ):
1423         """Obtain command to be executed to get information about the kdu instance."""
1424
1425 1     @abc.abstractmethod
1426 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
1427         """
1428         Method call to uninstall cluster software for helm. This method is dependent
1429         of helm version
1430         For Helm v2 it will be called when Tiller must be uninstalled
1431         For Helm v3 it does nothing and does not need to be callled
1432         """
1433
1434 1     @abc.abstractmethod
1435 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1436         """
1437         Obtains the cluster repos identifiers
1438         """
1439
1440 1     """
1441     ####################################################################################
1442     ################################### P R I V A T E ##################################
1443     ####################################################################################
1444     """
1445
1446 1     @staticmethod
1447 1     def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1448 0         if os.path.exists(filename):
1449 0             return True
1450         else:
1451 0             msg = "File {} does not exist".format(filename)
1452 0             if exception_if_not_exists:
1453 0                 raise K8sException(msg)
1454
1455 1     @staticmethod
1456 1     def _remove_multiple_spaces(strobj):
1457 0         strobj = strobj.strip()
1458 0         while "  " in strobj:
1459 0             strobj = strobj.replace("  ", " ")
1460 0         return strobj
1461
1462 1     @staticmethod
1463 1     def _output_to_lines(output: str) -> list:
1464 0         output_lines = list()
1465 0         lines = output.splitlines(keepends=False)
1466 0         for line in lines:
1467 0             line = line.strip()
1468 0             if len(line) > 0:
1469 0                 output_lines.append(line)
1470 0         return output_lines
1471
1472 1     @staticmethod
1473 1     def _output_to_table(output: str) -> list:
1474 1         output_table = list()
1475 1         lines = output.splitlines(keepends=False)
1476 1         for line in lines:
1477 0             line = line.replace("\t", " ")
1478 0             line_list = list()
1479 0             output_table.append(line_list)
1480 0             cells = line.split(sep=" ")
1481 0             for cell in cells:
1482 0                 cell = cell.strip()
1483 0                 if len(cell) > 0:
1484 0                     line_list.append(cell)
1485 1         return output_table
1486
1487 1     @staticmethod
1488 1     def _parse_services(output: str) -> list:
1489 0         lines = output.splitlines(keepends=False)
1490 0         services = []
1491 0         for line in lines:
1492 0             line = line.replace("\t", " ")
1493 0             cells = line.split(sep=" ")
1494 0             if len(cells) > 0 and cells[0].startswith("service/"):
1495 0                 elems = cells[0].split(sep="/")
1496 0                 if len(elems) > 1:
1497 0                     services.append(elems[1])
1498 0         return services
1499
1500 1     @staticmethod
1501 1     def _get_deep(dictionary: dict, members: tuple):
1502 1         target = dictionary
1503 1         value = None
1504 1         try:
1505 1             for m in members:
1506 1                 value = target.get(m)
1507 0                 if not value:
1508 0                     return None
1509                 else:
1510 0                     target = value
1511 1         except Exception:
1512 1             pass
1513 1         return value
1514
1515     # find key:value in several lines
1516 1     @staticmethod
1517 1     def _find_in_lines(p_lines: list, p_key: str) -> str:
1518 0         for line in p_lines:
1519 0             try:
1520 0                 if line.startswith(p_key + ":"):
1521 0                     parts = line.split(":")
1522 0                     the_value = parts[1].strip()
1523 0                     return the_value
1524 0             except Exception:
1525                 # ignore it
1526 0                 pass
1527 0         return None
1528
1529 1     @staticmethod
1530 1     def _lower_keys_list(input_list: list):
1531         """
1532         Transform the keys in a list of dictionaries to lower case and returns a new list
1533         of dictionaries
1534         """
1535 0         new_list = []
1536 0         if input_list:
1537 0             for dictionary in input_list:
1538 0                 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1539 0                 new_list.append(new_dict)
1540 0         return new_list
1541
1542 1     async def _local_async_exec(
1543         self,
1544         command: str,
1545         raise_exception_on_error: bool = False,
1546         show_error_log: bool = True,
1547         encode_utf8: bool = False,
1548         env: dict = None,
1549     ) -> tuple[str, int]:
1550 0         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1551 0         self.log.debug(
1552             "Executing async local command: {}, env: {}".format(command, env)
1553         )
1554
1555         # split command
1556 0         command = shlex.split(command)
1557
1558 0         environ = os.environ.copy()
1559 0         if env:
1560 0             environ.update(env)
1561
1562 0         try:
1563 0             async with self.cmd_lock:
1564 0                 process = await asyncio.create_subprocess_exec(
1565                     *command,
1566                     stdout=asyncio.subprocess.PIPE,
1567                     stderr=asyncio.subprocess.PIPE,
1568                     env=environ,
1569                 )
1570
1571                 # wait for command terminate
1572 0                 stdout, stderr = await process.communicate()
1573
1574 0                 return_code = process.returncode
1575
1576 0             output = ""
1577 0             if stdout:
1578 0                 output = stdout.decode("utf-8").strip()
1579                 # output = stdout.decode()
1580 0             if stderr:
1581 0                 output = stderr.decode("utf-8").strip()
1582                 # output = stderr.decode()
1583
1584 0             if return_code != 0 and show_error_log:
1585 0                 self.log.debug(
1586                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1587                 )
1588             else:
1589 0                 self.log.debug("Return code: {}".format(return_code))
1590
1591 0             if raise_exception_on_error and return_code != 0:
1592 0                 raise K8sException(output)
1593
1594 0             if encode_utf8:
1595 0                 output = output.encode("utf-8").strip()
1596 0                 output = str(output).replace("\\n", "\n")
1597
1598 0             return output, return_code
1599
1600 0         except asyncio.CancelledError:
1601             # first, kill the process if it is still running
1602 0             if process.returncode is None:
1603 0                 process.kill()
1604 0             raise
1605 0         except K8sException:
1606 0             raise
1607 0         except Exception as e:
1608 0             msg = "Exception executing command: {} -> {}".format(command, e)
1609 0             self.log.error(msg)
1610 0             if raise_exception_on_error:
1611 0                 raise K8sException(e) from e
1612             else:
1613 0                 return "", -1
1614
1615 1     async def _local_async_exec_pipe(
1616         self,
1617         command1: str,
1618         command2: str,
1619         raise_exception_on_error: bool = True,
1620         show_error_log: bool = True,
1621         encode_utf8: bool = False,
1622         env: dict = None,
1623     ):
1624 0         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1625 0         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1626 0         command = "{} | {}".format(command1, command2)
1627 0         self.log.debug(
1628             "Executing async local command: {}, env: {}".format(command, env)
1629         )
1630
1631         # split command
1632 0         command1 = shlex.split(command1)
1633 0         command2 = shlex.split(command2)
1634
1635 0         environ = os.environ.copy()
1636 0         if env:
1637 0             environ.update(env)
1638
1639 0         try:
1640 0             async with self.cmd_lock:
1641 0                 read, write = os.pipe()
1642 0                 process_1 = await asyncio.create_subprocess_exec(
1643                     *command1, stdout=write, env=environ
1644                 )
1645 0                 os.close(write)
1646 0                 process_2 = await asyncio.create_subprocess_exec(
1647                     *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1648                 )
1649 0                 os.close(read)
1650 0                 stdout, stderr = await process_2.communicate()
1651
1652 0                 return_code = process_2.returncode
1653
1654 0             output = ""
1655 0             if stdout:
1656 0                 output = stdout.decode("utf-8").strip()
1657                 # output = stdout.decode()
1658 0             if stderr:
1659 0                 output = stderr.decode("utf-8").strip()
1660                 # output = stderr.decode()
1661
1662 0             if return_code != 0 and show_error_log:
1663 0                 self.log.debug(
1664                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1665                 )
1666             else:
1667 0                 self.log.debug("Return code: {}".format(return_code))
1668
1669 0             if raise_exception_on_error and return_code != 0:
1670 0                 raise K8sException(output)
1671
1672 0             if encode_utf8:
1673 0                 output = output.encode("utf-8").strip()
1674 0                 output = str(output).replace("\\n", "\n")
1675
1676 0             return output, return_code
1677 0         except asyncio.CancelledError:
1678             # first, kill the processes if they are still running
1679 0             for process in (process_1, process_2):
1680 0                 if process.returncode is None:
1681 0                     process.kill()
1682 0             raise
1683 0         except K8sException:
1684 0             raise
1685 0         except Exception as e:
1686 0             msg = "Exception executing command: {} -> {}".format(command, e)
1687 0             self.log.error(msg)
1688 0             if raise_exception_on_error:
1689 0                 raise K8sException(e) from e
1690             else:
1691 0                 return "", -1
1692
1693 1     async def _get_service(self, cluster_id, service_name, namespace):
1694         """
1695         Obtains the data of the specified service in the k8cluster.
1696
1697         :param cluster_id: id of a K8s cluster known by OSM
1698         :param service_name: name of the K8s service in the specified namespace
1699         :param namespace: K8s namespace used by the KDU instance
1700         :return: If successful, it will return a service with the following data:
1701         - `name` of the service
1702         - `type` type of service in the k8 cluster
1703         - `ports` List of ports offered by the service, for each port includes at least
1704         name, port, protocol
1705         - `cluster_ip` Internal ip to be used inside k8s cluster
1706         - `external_ip` List of external ips (in case they are available)
1707         """
1708
1709         # init config, env
1710 1         paths, env = self._init_paths_env(
1711             cluster_name=cluster_id, create_if_not_exist=True
1712         )
1713
1714 1         command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1715             self.kubectl_command,
1716             paths["kube_config"],
1717             quote(namespace),
1718             quote(service_name),
1719         )
1720
1721 1         output, _rc = await self._local_async_exec(
1722             command=command, raise_exception_on_error=True, env=env
1723         )
1724
1725 1         data = yaml.load(output, Loader=yaml.SafeLoader)
1726
1727 1         service = {
1728             "name": service_name,
1729             "type": self._get_deep(data, ("spec", "type")),
1730             "ports": self._get_deep(data, ("spec", "ports")),
1731             "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1732         }
1733 1         if service["type"] == "LoadBalancer":
1734 0             ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1735 0             ip_list = [elem["ip"] for elem in ip_map_list]
1736 0             service["external_ip"] = ip_list
1737
1738 1         return service
1739
1740 1     async def _exec_get_command(
1741         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1742     ):
1743         """Obtains information about the kdu instance."""
1744
1745 1         full_command = self._get_get_command(
1746             get_command, kdu_instance, namespace, kubeconfig
1747         )
1748
1749 1         output, _rc = await self._local_async_exec(command=full_command)
1750
1751 1         return output
1752
1753 1     async def _exec_inspect_command(
1754         self, inspect_command: str, kdu_model: str, repo_url: str = None
1755     ):
1756         """Obtains information about an Helm Chart package (´helm show´ command)
1757
1758         Args:
1759             inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1760             kdu_model: The name or path of an Helm Chart
1761             repo_url: Helm Chart repository url
1762
1763         Returns:
1764             str: the requested info about the Helm Chart package
1765         """
1766
1767 1         repo_str = ""
1768 1         if repo_url:
1769 1             repo_str = " --repo {}".format(quote(repo_url))
1770
1771             # Obtain the Chart's name and store it in the var kdu_model
1772 1             kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1773
1774 1         kdu_model, version = self._split_version(kdu_model)
1775 1         if version:
1776 1             version_str = "--version {}".format(quote(version))
1777         else:
1778 0             version_str = ""
1779
1780 1         full_command = self._get_inspect_command(
1781             show_command=inspect_command,
1782             kdu_model=quote(kdu_model),
1783             repo_str=repo_str,
1784             version=version_str,
1785         )
1786
1787 1         output, _ = await self._local_async_exec(command=full_command)
1788
1789 1         return output
1790
1791 1     async def _get_replica_count_url(
1792         self,
1793         kdu_model: str,
1794         repo_url: str = None,
1795         resource_name: str = None,
1796     ) -> tuple[int, str]:
1797         """Get the replica count value in the Helm Chart Values.
1798
1799         Args:
1800             kdu_model: The name or path of an Helm Chart
1801             repo_url: Helm Chart repository url
1802             resource_name: Resource name
1803
1804         Returns:
1805             A tuple with:
1806             - The number of replicas of the specific instance; if not found, returns None; and
1807             - The string corresponding to the replica count key in the Helm values
1808         """
1809
1810 1         kdu_values = yaml.load(
1811             await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1812             Loader=yaml.SafeLoader,
1813         )
1814
1815 1         self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1816
1817 1         if not kdu_values:
1818 0             raise K8sException(
1819                 "kdu_values not found for kdu_model {}".format(kdu_model)
1820             )
1821
1822 1         if resource_name:
1823 1             kdu_values = kdu_values.get(resource_name, None)
1824
1825 1         if not kdu_values:
1826 0             msg = "resource {} not found in the values in model {}".format(
1827                 resource_name, kdu_model
1828             )
1829 0             self.log.error(msg)
1830 0             raise K8sException(msg)
1831
1832 1         duplicate_check = False
1833
1834 1         replica_str = ""
1835 1         replicas = None
1836
1837 1         if kdu_values.get("replicaCount") is not None:
1838 1             replicas = kdu_values["replicaCount"]
1839 1             replica_str = "replicaCount"
1840 1         elif kdu_values.get("replicas") is not None:
1841 1             duplicate_check = True
1842 1             replicas = kdu_values["replicas"]
1843 1             replica_str = "replicas"
1844         else:
1845 0             if resource_name:
1846 0                 msg = (
1847                     "replicaCount or replicas not found in the resource"
1848                     "{} values in model {}. Cannot be scaled".format(
1849                         resource_name, kdu_model
1850                     )
1851                 )
1852             else:
1853 0                 msg = (
1854                     "replicaCount or replicas not found in the values"
1855                     "in model {}. Cannot be scaled".format(kdu_model)
1856                 )
1857 0             self.log.error(msg)
1858 0             raise K8sException(msg)
1859
1860         # Control if replicas and replicaCount exists at the same time
1861 1         msg = "replicaCount and replicas are exists at the same time"
1862 1         if duplicate_check:
1863 1             if "replicaCount" in kdu_values:
1864 0                 self.log.error(msg)
1865 0                 raise K8sException(msg)
1866         else:
1867 1             if "replicas" in kdu_values:
1868 0                 self.log.error(msg)
1869 0                 raise K8sException(msg)
1870
1871 1         return replicas, replica_str
1872
1873 1     async def _get_replica_count_instance(
1874         self,
1875         kdu_instance: str,
1876         namespace: str,
1877         kubeconfig: str,
1878         resource_name: str = None,
1879     ) -> int:
1880         """Get the replica count value in the instance.
1881
1882         Args:
1883             kdu_instance: The name of the KDU instance
1884             namespace: KDU instance namespace
1885             kubeconfig:
1886             resource_name: Resource name
1887
1888         Returns:
1889             The number of replicas of the specific instance; if not found, returns None
1890         """
1891
1892 0         kdu_values = yaml.load(
1893             await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1894             Loader=yaml.SafeLoader,
1895         )
1896
1897 0         self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1898
1899 0         replicas = None
1900
1901 0         if kdu_values:
1902 0             resource_values = (
1903                 kdu_values.get(resource_name, None) if resource_name else None
1904             )
1905
1906 0             for replica_str in ("replicaCount", "replicas"):
1907 0                 if resource_values:
1908 0                     replicas = resource_values.get(replica_str)
1909                 else:
1910 0                     replicas = kdu_values.get(replica_str)
1911
1912 0                 if replicas is not None:
1913 0                     break
1914
1915 0         return replicas
1916
1917 1     async def _store_status(
1918         self,
1919         cluster_id: str,
1920         operation: str,
1921         kdu_instance: str,
1922         namespace: str = None,
1923         db_dict: dict = None,
1924     ) -> None:
1925         """
1926         Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1927
1928         :param cluster_id (str): the cluster where the KDU instance is deployed
1929         :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1930         :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1931         :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1932         :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1933         values for the keys:
1934             - "collection": The Mongo DB collection to write to
1935             - "filter": The query filter to use in the update process
1936             - "path": The dot separated keys which targets the object to be updated
1937         Defaults to None.
1938         """
1939
1940 1         try:
1941 1             detailed_status = await self._status_kdu(
1942                 cluster_id=cluster_id,
1943                 kdu_instance=kdu_instance,
1944                 yaml_format=False,
1945                 namespace=namespace,
1946             )
1947
1948 1             status = detailed_status.get("info").get("description")
1949 1             self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1950
1951             # write status to db
1952 1             result = await self.write_app_status_to_db(
1953                 db_dict=db_dict,
1954                 status=str(status),
1955                 detailed_status=str(detailed_status),
1956                 operation=operation,
1957             )
1958
1959 1             if not result:
1960 0                 self.log.info("Error writing in database. Task exiting...")
1961
1962 0         except asyncio.CancelledError as e:
1963 0             self.log.warning(
1964                 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1965             )
1966 0         except Exception as e:
1967 0             self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1968
1969     # params for use in -f file
1970     # returns values file option and filename (in order to delete it at the end)
1971 1     def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
1972 1         if params and len(params) > 0:
1973 0             self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1974
1975 0             def get_random_number():
1976 0                 r = random.SystemRandom().randint(1, 99999999)
1977 0                 s = str(r)
1978 0                 while len(s) < 10:
1979 0                     s = "0" + s
1980 0                 return s
1981
1982 0             params2 = dict()
1983 0             for key in params:
1984 0                 value = params.get(key)
1985 0                 if "!!yaml" in str(value):
1986 0                     value = yaml.safe_load(value[7:])
1987 0                 params2[key] = value
1988
1989 0             values_file = get_random_number() + ".yaml"
1990 0             with open(values_file, "w") as stream:
1991 0                 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1992
1993 0             return "-f {}".format(values_file), values_file
1994
1995 1         return "", None
1996
1997     # params for use in --set option
1998 1     @staticmethod
1999 1     def _params_to_set_option(params: dict) -> str:
2000 1         pairs = [
2001             f"{quote(str(key))}={quote(str(value))}"
2002             for key, value in params.items()
2003             if value is not None
2004         ]
2005 1         if not pairs:
2006 0             return ""
2007 1         return "--set " + ",".join(pairs)
2008
2009 1     @staticmethod
2010 1     def generate_kdu_instance_name(**kwargs):
2011 0         chart_name = kwargs["kdu_model"]
2012         # check embeded chart (file or dir)
2013 0         if chart_name.startswith("/"):
2014             # extract file or directory name
2015 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
2016         # check URL
2017 0         elif "://" in chart_name:
2018             # extract last portion of URL
2019 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
2020
2021 0         name = ""
2022 0         for c in chart_name:
2023 0             if c.isalpha() or c.isnumeric():
2024 0                 name += c
2025             else:
2026 0                 name += "-"
2027 0         if len(name) > 35:
2028 0             name = name[0:35]
2029
2030         # if does not start with alpha character, prefix 'a'
2031 0         if not name[0].isalpha():
2032 0             name = "a" + name
2033
2034 0         name += "-"
2035
2036 0         def get_random_number():
2037 0             r = random.SystemRandom().randint(1, 99999999)
2038 0             s = str(r)
2039 0             s = s.rjust(10, "0")
2040 0             return s
2041
2042 0         name = name + get_random_number()
2043 0         return name.lower()
2044
2045 1     def _split_version(self, kdu_model: str) -> tuple[str, str]:
2046 1         version = None
2047 1         if (
2048             not (
2049                 self._is_helm_chart_a_file(kdu_model)
2050                 or self._is_helm_chart_a_url(kdu_model)
2051             )
2052             and ":" in kdu_model
2053         ):
2054 1             parts = kdu_model.split(sep=":")
2055 1             if len(parts) == 2:
2056 1                 version = str(parts[1])
2057 1                 kdu_model = parts[0]
2058 1         return kdu_model, version
2059
2060 1     def _split_repo(self, kdu_model: str) -> tuple[str, str]:
2061         """Obtain the Helm Chart's repository and Chart's names from the KDU model
2062
2063         Args:
2064             kdu_model (str): Associated KDU model
2065
2066         Returns:
2067             (str, str): Tuple with the Chart name in index 0, and the repo name
2068                         in index 2; if there was a problem finding them, return None
2069                         for both
2070         """
2071
2072 1         chart_name = None
2073 1         repo_name = None
2074
2075 1         idx = kdu_model.find("/")
2076 1         if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
2077 1             chart_name = kdu_model[idx + 1 :]
2078 1             repo_name = kdu_model[:idx]
2079
2080 1         return chart_name, repo_name
2081
2082 1     async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2083         """Obtain the Helm repository for an Helm Chart
2084
2085         Args:
2086             kdu_model (str): the KDU model associated with the Helm Chart instantiation
2087             cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2088
2089         Returns:
2090             str: the repository URL; if Helm Chart is a local one, the function returns None
2091         """
2092
2093 1         _, repo_name = self._split_repo(kdu_model=kdu_model)
2094
2095 1         repo_url = None
2096 1         if repo_name:
2097             # Find repository link
2098 1             local_repo_list = await self.repo_list(cluster_uuid)
2099 1             for repo in local_repo_list:
2100 1                 if repo["name"] == repo_name:
2101 1                     repo_url = repo["url"]
2102 1                     break  # it is not necessary to continue the loop if the repo link was found...
2103
2104 1         return repo_url
2105
2106 1     def _repo_to_oci_url(self, repo):
2107 0         db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
2108 0         if db_repo and "oci" in db_repo:
2109 0             return db_repo.get("url")
2110
2111 1     async def _prepare_helm_chart(self, kdu_model, cluster_id):
2112         # e.g.: "stable/openldap", "1.0"
2113 1         kdu_model, version = self._split_version(kdu_model)
2114         # e.g.: "openldap, stable"
2115 1         chart_name, repo = self._split_repo(kdu_model)
2116 1         if repo and chart_name:  # repo/chart case
2117 1             oci_url = self._repo_to_oci_url(repo)
2118 1             if oci_url:  # oci does not require helm repo update
2119 0                 kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}"  # urljoin doesn't work for oci schema
2120             else:
2121 1                 await self.repo_update(cluster_id, repo)
2122 1         return kdu_model, version
2123
2124 1     async def create_certificate(
2125         self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2126     ):
2127 0         paths, env = self._init_paths_env(
2128             cluster_name=cluster_uuid, create_if_not_exist=True
2129         )
2130 0         kubectl = Kubectl(config_file=paths["kube_config"])
2131 0         await kubectl.create_certificate(
2132             namespace=namespace,
2133             name=name,
2134             dns_prefix=dns_prefix,
2135             secret_name=secret_name,
2136             usages=[usage],
2137             issuer_name="ca-issuer",
2138         )
2139
2140 1     async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2141 0         paths, env = self._init_paths_env(
2142             cluster_name=cluster_uuid, create_if_not_exist=True
2143         )
2144 0         kubectl = Kubectl(config_file=paths["kube_config"])
2145 0         await kubectl.delete_certificate(namespace, certificate_name)
2146
2147 1     async def create_namespace(
2148         self,
2149         namespace,
2150         cluster_uuid,
2151         labels,
2152     ):
2153         """
2154         Create a namespace in a specific cluster
2155
2156         :param namespace:    Namespace to be created
2157         :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2158         :param labels:       Dictionary with labels for the new namespace
2159         :returns: None
2160         """
2161 0         paths, env = self._init_paths_env(
2162             cluster_name=cluster_uuid, create_if_not_exist=True
2163         )
2164 0         kubectl = Kubectl(config_file=paths["kube_config"])
2165 0         await kubectl.create_namespace(
2166             name=namespace,
2167             labels=labels,
2168         )
2169
2170 1     async def delete_namespace(
2171         self,
2172         namespace,
2173         cluster_uuid,
2174     ):
2175         """
2176         Delete a namespace in a specific cluster
2177
2178         :param namespace: namespace to be deleted
2179         :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2180         :returns: None
2181         """
2182 0         paths, env = self._init_paths_env(
2183             cluster_name=cluster_uuid, create_if_not_exist=True
2184         )
2185 0         kubectl = Kubectl(config_file=paths["kube_config"])
2186 0         await kubectl.delete_namespace(
2187             name=namespace,
2188         )
2189
2190 1     async def copy_secret_data(
2191         self,
2192         src_secret: str,
2193         dst_secret: str,
2194         cluster_uuid: str,
2195         data_key: str,
2196         src_namespace: str = "osm",
2197         dst_namespace: str = "osm",
2198     ):
2199         """
2200         Copy a single key and value from an existing secret to a new one
2201
2202         :param src_secret: name of the existing secret
2203         :param dst_secret: name of the new secret
2204         :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2205         :param data_key: key of the existing secret to be copied
2206         :param src_namespace: Namespace of the existing secret
2207         :param dst_namespace: Namespace of the new secret
2208         :returns: None
2209         """
2210 0         paths, env = self._init_paths_env(
2211             cluster_name=cluster_uuid, create_if_not_exist=True
2212         )
2213 0         kubectl = Kubectl(config_file=paths["kube_config"])
2214 0         secret_data = await kubectl.get_secret_content(
2215             name=src_secret,
2216             namespace=src_namespace,
2217         )
2218         # Only the corresponding data_key value needs to be copy
2219 0         data = {data_key: secret_data.get(data_key)}
2220 0         await kubectl.create_secret(
2221             name=dst_secret,
2222             data=data,
2223             namespace=dst_namespace,
2224             secret_type="Opaque",
2225         )
2226
2227 1     async def setup_default_rbac(
2228         self,
2229         name,
2230         namespace,
2231         cluster_uuid,
2232         api_groups,
2233         resources,
2234         verbs,
2235         service_account,
2236     ):
2237         """
2238         Create a basic RBAC for a new namespace.
2239
2240         :param name: name of both Role and Role Binding
2241         :param namespace: K8s namespace
2242         :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2243         :param api_groups: Api groups to be allowed in Policy Rule
2244         :param resources: Resources to be allowed in Policy Rule
2245         :param verbs: Verbs to be allowed in Policy Rule
2246         :param service_account: Service Account name used to bind the Role
2247         :returns: None
2248         """
2249 0         paths, env = self._init_paths_env(
2250             cluster_name=cluster_uuid, create_if_not_exist=True
2251         )
2252 0         kubectl = Kubectl(config_file=paths["kube_config"])
2253 0         await kubectl.create_role(
2254             name=name,
2255             labels={},
2256             namespace=namespace,
2257             api_groups=api_groups,
2258             resources=resources,
2259             verbs=verbs,
2260         )
2261 0         await kubectl.create_role_binding(
2262             name=name,
2263             labels={},
2264             namespace=namespace,
2265             role_name=name,
2266             sa_name=service_account,
2267         )