Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_base_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_helm_base_conn.py
100%
1/1
59%
439/741
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_base_conn.py
59%
439/741
N/A

Source

n2vc/k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import abc
23 1 import asyncio
24 1 from typing import Union
25 1 import random
26 1 import time
27 1 import shlex
28 1 import shutil
29 1 import stat
30 1 import os
31 1 import yaml
32 1 from uuid import uuid4
33
34 1 from n2vc.config import EnvironConfig
35 1 from n2vc.exceptions import K8sException
36 1 from n2vc.k8s_conn import K8sConnector
37 1 from n2vc.kubectl import Kubectl
38
39
40 1 class K8sHelmBaseConnector(K8sConnector):
41
42     """
43     ####################################################################################
44     ################################### P U B L I C ####################################
45     ####################################################################################
46     """
47
48 1     service_account = "osm"
49
50 1     def __init__(
51         self,
52         fs: object,
53         db: object,
54         kubectl_command: str = "/usr/bin/kubectl",
55         helm_command: str = "/usr/bin/helm",
56         log: object = None,
57         on_update_db=None,
58     ):
59         """
60
61         :param fs: file system for kubernetes and helm configuration
62         :param db: database object to write current operation status
63         :param kubectl_command: path to kubectl executable
64         :param helm_command: path to helm executable
65         :param log: logger
66         :param on_update_db: callback called when k8s connector updates database
67         """
68
69         # parent class
70 1         K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 1         self.log.info("Initializing K8S Helm connector")
73
74 1         self.config = EnvironConfig()
75         # random numbers for release name generation
76 1         random.seed(time.time())
77
78         # the file system
79 1         self.fs = fs
80
81         # exception if kubectl is not installed
82 1         self.kubectl_command = kubectl_command
83 1         self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
84
85         # exception if helm is not installed
86 1         self._helm_command = helm_command
87 1         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
88
89         # obtain stable repo url from config or apply default
90 1         self._stable_repo_url = self.config.get("stablerepourl")
91 1         if self._stable_repo_url == "None":
92 0             self._stable_repo_url = None
93
94         # Lock to avoid concurrent execution of helm commands
95 1         self.cmd_lock = asyncio.Lock()
96
97 1     def _get_namespace(self, cluster_uuid: str) -> str:
98         """
99         Obtains the namespace used by the cluster with the uuid passed by argument
100
101         param: cluster_uuid: cluster's uuid
102         """
103
104         # first, obtain the cluster corresponding to the uuid passed by argument
105 1         k8scluster = self.db.get_one(
106             "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
107         )
108 1         return k8scluster.get("namespace")
109
110 1     async def init_env(
111         self,
112         k8s_creds: str,
113         namespace: str = "kube-system",
114         reuse_cluster_uuid=None,
115         **kwargs,
116     ) -> (str, bool):
117         """
118         It prepares a given K8s cluster environment to run Charts
119
120         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121             '.kube/config'
122         :param namespace: optional namespace to be used for helm. By default,
123             'kube-system' will be used
124         :param reuse_cluster_uuid: existing cluster uuid for reuse
125         :param kwargs: Additional parameters (None yet)
126         :return: uuid of the K8s cluster and True if connector has installed some
127             software in the cluster
128         (on error, an exception will be raised)
129         """
130
131 1         if reuse_cluster_uuid:
132 1             cluster_id = reuse_cluster_uuid
133         else:
134 0             cluster_id = str(uuid4())
135
136 1         self.log.debug(
137             "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
138         )
139
140 1         paths, env = self._init_paths_env(
141             cluster_name=cluster_id, create_if_not_exist=True
142         )
143 1         mode = stat.S_IRUSR | stat.S_IWUSR
144 1         with open(paths["kube_config"], "w", mode) as f:
145 1             f.write(k8s_creds)
146 1         os.chmod(paths["kube_config"], 0o600)
147
148         # Code with initialization specific of helm version
149 1         n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
150
151         # sync fs with local data
152 1         self.fs.reverse_sync(from_path=cluster_id)
153
154 1         self.log.info("Cluster {} initialized".format(cluster_id))
155
156 1         return cluster_id, n2vc_installed_sw
157
158 1     async def repo_add(
159         self,
160         cluster_uuid: str,
161         name: str,
162         url: str,
163         repo_type: str = "chart",
164         cert: str = None,
165         user: str = None,
166         password: str = None,
167     ):
168 1         self.log.debug(
169             "Cluster {}, adding {} repository {}. URL: {}".format(
170                 cluster_uuid, repo_type, name, url
171             )
172         )
173
174         # init_env
175 1         paths, env = self._init_paths_env(
176             cluster_name=cluster_uuid, create_if_not_exist=True
177         )
178
179         # sync local dir
180 1         self.fs.sync(from_path=cluster_uuid)
181
182         # helm repo add name url
183 1         command = ("env KUBECONFIG={} {} repo add {} {}").format(
184             paths["kube_config"], self._helm_command, name, url
185         )
186
187 1         if cert:
188 0             temp_cert_file = os.path.join(
189                 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
190             )
191 0             os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
192 0             with open(temp_cert_file, "w") as the_cert:
193 0                 the_cert.write(cert)
194 0             command += " --ca-file {}".format(temp_cert_file)
195
196 1         if user:
197 0             command += " --username={}".format(user)
198
199 1         if password:
200 0             command += " --password={}".format(password)
201
202 1         self.log.debug("adding repo: {}".format(command))
203 1         await self._local_async_exec(
204             command=command, raise_exception_on_error=True, env=env
205         )
206
207         # helm repo update
208 1         command = "env KUBECONFIG={} {} repo update {}".format(
209             paths["kube_config"], self._helm_command, name
210         )
211 1         self.log.debug("updating repo: {}".format(command))
212 1         await self._local_async_exec(
213             command=command, raise_exception_on_error=False, env=env
214         )
215
216         # sync fs
217 1         self.fs.reverse_sync(from_path=cluster_uuid)
218
219 1     async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
220 1         self.log.debug(
221             "Cluster {}, updating {} repository {}".format(
222                 cluster_uuid, repo_type, name
223             )
224         )
225
226         # init_env
227 1         paths, env = self._init_paths_env(
228             cluster_name=cluster_uuid, create_if_not_exist=True
229         )
230
231         # sync local dir
232 1         self.fs.sync(from_path=cluster_uuid)
233
234         # helm repo update
235 1         command = "{} repo update {}".format(self._helm_command, name)
236 1         self.log.debug("updating repo: {}".format(command))
237 1         await self._local_async_exec(
238             command=command, raise_exception_on_error=False, env=env
239         )
240
241         # sync fs
242 1         self.fs.reverse_sync(from_path=cluster_uuid)
243
244 1     async def repo_list(self, cluster_uuid: str) -> list:
245         """
246         Get the list of registered repositories
247
248         :return: list of registered repositories: [ (name, url) .... ]
249         """
250
251 1         self.log.debug("list repositories for cluster {}".format(cluster_uuid))
252
253         # config filename
254 1         paths, env = self._init_paths_env(
255             cluster_name=cluster_uuid, create_if_not_exist=True
256         )
257
258         # sync local dir
259 1         self.fs.sync(from_path=cluster_uuid)
260
261 1         command = "env KUBECONFIG={} {} repo list --output yaml".format(
262             paths["kube_config"], self._helm_command
263         )
264
265         # Set exception to false because if there are no repos just want an empty list
266 1         output, _rc = await self._local_async_exec(
267             command=command, raise_exception_on_error=False, env=env
268         )
269
270         # sync fs
271 1         self.fs.reverse_sync(from_path=cluster_uuid)
272
273 1         if _rc == 0:
274 1             if output and len(output) > 0:
275 0                 repos = yaml.load(output, Loader=yaml.SafeLoader)
276                 # unify format between helm2 and helm3 setting all keys lowercase
277 0                 return self._lower_keys_list(repos)
278             else:
279 1                 return []
280         else:
281 0             return []
282
283 1     async def repo_remove(self, cluster_uuid: str, name: str):
284 1         self.log.debug(
285             "remove {} repositories for cluster {}".format(name, cluster_uuid)
286         )
287
288         # init env, paths
289 1         paths, env = self._init_paths_env(
290             cluster_name=cluster_uuid, create_if_not_exist=True
291         )
292
293         # sync local dir
294 1         self.fs.sync(from_path=cluster_uuid)
295
296 1         command = "env KUBECONFIG={} {} repo remove {}".format(
297             paths["kube_config"], self._helm_command, name
298         )
299 1         await self._local_async_exec(
300             command=command, raise_exception_on_error=True, env=env
301         )
302
303         # sync fs
304 1         self.fs.reverse_sync(from_path=cluster_uuid)
305
306 1     async def reset(
307         self,
308         cluster_uuid: str,
309         force: bool = False,
310         uninstall_sw: bool = False,
311         **kwargs,
312     ) -> bool:
313         """Reset a cluster
314
315         Resets the Kubernetes cluster by removing the helm deployment that represents it.
316
317         :param cluster_uuid: The UUID of the cluster to reset
318         :param force: Boolean to force the reset
319         :param uninstall_sw: Boolean to force the reset
320         :param kwargs: Additional parameters (None yet)
321         :return: Returns True if successful or raises an exception.
322         """
323 1         namespace = self._get_namespace(cluster_uuid=cluster_uuid)
324 1         self.log.debug(
325             "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326                 cluster_uuid, uninstall_sw
327             )
328         )
329
330         # sync local dir
331 1         self.fs.sync(from_path=cluster_uuid)
332
333         # uninstall releases if needed.
334 1         if uninstall_sw:
335 1             releases = await self.instances_list(cluster_uuid=cluster_uuid)
336 1             if len(releases) > 0:
337 1                 if force:
338 1                     for r in releases:
339 1                         try:
340 1                             kdu_instance = r.get("name")
341 1                             chart = r.get("chart")
342 1                             self.log.debug(
343                                 "Uninstalling {} -> {}".format(chart, kdu_instance)
344                             )
345 1                             await self.uninstall(
346                                 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
347                             )
348 0                         except Exception as e:
349                             # will not raise exception as it was found
350                             # that in some cases of previously installed helm releases it
351                             # raised an error
352 0                             self.log.warn(
353                                 "Error uninstalling release {}: {}".format(
354                                     kdu_instance, e
355                                 )
356                             )
357                 else:
358 0                     msg = (
359                         "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360                     ).format(cluster_uuid)
361 0                     self.log.warn(msg)
362 0                     uninstall_sw = (
363                         False  # Allow to remove k8s cluster without removing Tiller
364                     )
365
366 1         if uninstall_sw:
367 1             await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
368
369         # delete cluster directory
370 1         self.log.debug("Removing directory {}".format(cluster_uuid))
371 1         self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
372         # Remove also local directorio if still exist
373 1         direct = self.fs.path + "/" + cluster_uuid
374 1         shutil.rmtree(direct, ignore_errors=True)
375
376 1         return True
377
378 1     def _is_helm_chart_a_file(self, chart_name: str):
379 1         return chart_name.count("/") > 1
380
381 1     async def _install_impl(
382         self,
383         cluster_id: str,
384         kdu_model: str,
385         paths: dict,
386         env: dict,
387         kdu_instance: str,
388         atomic: bool = True,
389         timeout: float = 300,
390         params: dict = None,
391         db_dict: dict = None,
392         kdu_name: str = None,
393         namespace: str = None,
394     ):
395         # init env, paths
396 1         paths, env = self._init_paths_env(
397             cluster_name=cluster_id, create_if_not_exist=True
398         )
399
400         # params to str
401 1         params_str, file_to_delete = self._params_to_file_option(
402             cluster_id=cluster_id, params=params
403         )
404
405         # version
406 1         kdu_model, version = self._split_version(kdu_model)
407
408 1         _, repo = self._split_repo(kdu_model)
409 1         if repo:
410 1             await self.repo_update(cluster_id, repo)
411
412 1         command = self._get_install_command(
413             kdu_model,
414             kdu_instance,
415             namespace,
416             params_str,
417             version,
418             atomic,
419             timeout,
420             paths["kube_config"],
421         )
422
423 1         self.log.debug("installing: {}".format(command))
424
425 1         if atomic:
426             # exec helm in a task
427 1             exec_task = asyncio.ensure_future(
428                 coro_or_future=self._local_async_exec(
429                     command=command, raise_exception_on_error=False, env=env
430                 )
431             )
432
433             # write status in another task
434 1             status_task = asyncio.ensure_future(
435                 coro_or_future=self._store_status(
436                     cluster_id=cluster_id,
437                     kdu_instance=kdu_instance,
438                     namespace=namespace,
439                     db_dict=db_dict,
440                     operation="install",
441                 )
442             )
443
444             # wait for execution task
445 1             await asyncio.wait([exec_task])
446
447             # cancel status task
448 1             status_task.cancel()
449
450 1             output, rc = exec_task.result()
451
452         else:
453 0             output, rc = await self._local_async_exec(
454                 command=command, raise_exception_on_error=False, env=env
455             )
456
457         # remove temporal values yaml file
458 1         if file_to_delete:
459 0             os.remove(file_to_delete)
460
461         # write final status
462 1         await self._store_status(
463             cluster_id=cluster_id,
464             kdu_instance=kdu_instance,
465             namespace=namespace,
466             db_dict=db_dict,
467             operation="install",
468         )
469
470 1         if rc != 0:
471 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 0             self.log.error(msg)
473 0             raise K8sException(msg)
474
475 1     async def upgrade(
476         self,
477         cluster_uuid: str,
478         kdu_instance: str,
479         kdu_model: str = None,
480         atomic: bool = True,
481         timeout: float = 300,
482         params: dict = None,
483         db_dict: dict = None,
484         namespace: str = None,
485         force: bool = False,
486     ):
487 1         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
488
489         # sync local dir
490 1         self.fs.sync(from_path=cluster_uuid)
491
492         # look for instance to obtain namespace
493
494         # set namespace
495 1         if not namespace:
496 1             instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
497 1             if not instance_info:
498 0                 raise K8sException("kdu_instance {} not found".format(kdu_instance))
499 1             namespace = instance_info["namespace"]
500
501         # init env, paths
502 1         paths, env = self._init_paths_env(
503             cluster_name=cluster_uuid, create_if_not_exist=True
504         )
505
506         # sync local dir
507 1         self.fs.sync(from_path=cluster_uuid)
508
509         # params to str
510 1         params_str, file_to_delete = self._params_to_file_option(
511             cluster_id=cluster_uuid, params=params
512         )
513
514         # version
515 1         kdu_model, version = self._split_version(kdu_model)
516
517 1         _, repo = self._split_repo(kdu_model)
518 1         if repo:
519 1             await self.repo_update(cluster_uuid, repo)
520
521 1         command = self._get_upgrade_command(
522             kdu_model,
523             kdu_instance,
524             namespace,
525             params_str,
526             version,
527             atomic,
528             timeout,
529             paths["kube_config"],
530             force,
531         )
532
533 1         self.log.debug("upgrading: {}".format(command))
534
535 1         if atomic:
536             # exec helm in a task
537 1             exec_task = asyncio.ensure_future(
538                 coro_or_future=self._local_async_exec(
539                     command=command, raise_exception_on_error=False, env=env
540                 )
541             )
542             # write status in another task
543 1             status_task = asyncio.ensure_future(
544                 coro_or_future=self._store_status(
545                     cluster_id=cluster_uuid,
546                     kdu_instance=kdu_instance,
547                     namespace=namespace,
548                     db_dict=db_dict,
549                     operation="upgrade",
550                 )
551             )
552
553             # wait for execution task
554 1             await asyncio.wait([exec_task])
555
556             # cancel status task
557 1             status_task.cancel()
558 1             output, rc = exec_task.result()
559
560         else:
561 0             output, rc = await self._local_async_exec(
562                 command=command, raise_exception_on_error=False, env=env
563             )
564
565         # remove temporal values yaml file
566 1         if file_to_delete:
567 0             os.remove(file_to_delete)
568
569         # write final status
570 1         await self._store_status(
571             cluster_id=cluster_uuid,
572             kdu_instance=kdu_instance,
573             namespace=namespace,
574             db_dict=db_dict,
575             operation="upgrade",
576         )
577
578 1         if rc != 0:
579 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
580 0             self.log.error(msg)
581 0             raise K8sException(msg)
582
583         # sync fs
584 1         self.fs.reverse_sync(from_path=cluster_uuid)
585
586         # return new revision number
587 1         instance = await self.get_instance_info(
588             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
589         )
590 1         if instance:
591 1             revision = int(instance.get("revision"))
592 1             self.log.debug("New revision: {}".format(revision))
593 1             return revision
594         else:
595 0             return 0
596
597 1     async def scale(
598         self,
599         kdu_instance: str,
600         scale: int,
601         resource_name: str,
602         total_timeout: float = 1800,
603         cluster_uuid: str = None,
604         kdu_model: str = None,
605         atomic: bool = True,
606         db_dict: dict = None,
607         **kwargs,
608     ):
609         """Scale a resource in a Helm Chart.
610
611         Args:
612             kdu_instance: KDU instance name
613             scale: Scale to which to set the resource
614             resource_name: Resource name
615             total_timeout: The time, in seconds, to wait
616             cluster_uuid: The UUID of the cluster
617             kdu_model: The chart reference
618             atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
619                 The --wait flag will be set automatically if --atomic is used
620             db_dict: Dictionary for any additional data
621             kwargs: Additional parameters
622
623         Returns:
624             True if successful, False otherwise
625         """
626
627 1         debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
628 1         if resource_name:
629 1             debug_mgs = "scaling resource {} in model {} (cluster {})".format(
630                 resource_name, kdu_model, cluster_uuid
631             )
632
633 1         self.log.debug(debug_mgs)
634
635         # look for instance to obtain namespace
636         # get_instance_info function calls the sync command
637 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
638 1         if not instance_info:
639 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
640
641         # init env, paths
642 1         paths, env = self._init_paths_env(
643             cluster_name=cluster_uuid, create_if_not_exist=True
644         )
645
646         # version
647 1         kdu_model, version = self._split_version(kdu_model)
648
649 1         repo_url = await self._find_repo(kdu_model, cluster_uuid)
650
651 1         _, replica_str = await self._get_replica_count_url(
652             kdu_model, repo_url, resource_name
653         )
654
655 1         command = self._get_upgrade_scale_command(
656             kdu_model,
657             kdu_instance,
658             instance_info["namespace"],
659             scale,
660             version,
661             atomic,
662             replica_str,
663             total_timeout,
664             resource_name,
665             paths["kube_config"],
666         )
667
668 1         self.log.debug("scaling: {}".format(command))
669
670 1         if atomic:
671             # exec helm in a task
672 1             exec_task = asyncio.ensure_future(
673                 coro_or_future=self._local_async_exec(
674                     command=command, raise_exception_on_error=False, env=env
675                 )
676             )
677             # write status in another task
678 1             status_task = asyncio.ensure_future(
679                 coro_or_future=self._store_status(
680                     cluster_id=cluster_uuid,
681                     kdu_instance=kdu_instance,
682                     namespace=instance_info["namespace"],
683                     db_dict=db_dict,
684                     operation="scale",
685                 )
686             )
687
688             # wait for execution task
689 1             await asyncio.wait([exec_task])
690
691             # cancel status task
692 1             status_task.cancel()
693 1             output, rc = exec_task.result()
694
695         else:
696 0             output, rc = await self._local_async_exec(
697                 command=command, raise_exception_on_error=False, env=env
698             )
699
700         # write final status
701 1         await self._store_status(
702             cluster_id=cluster_uuid,
703             kdu_instance=kdu_instance,
704             namespace=instance_info["namespace"],
705             db_dict=db_dict,
706             operation="scale",
707         )
708
709 1         if rc != 0:
710 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
711 0             self.log.error(msg)
712 0             raise K8sException(msg)
713
714         # sync fs
715 1         self.fs.reverse_sync(from_path=cluster_uuid)
716
717 1         return True
718
719 1     async def get_scale_count(
720         self,
721         resource_name: str,
722         kdu_instance: str,
723         cluster_uuid: str,
724         kdu_model: str,
725         **kwargs,
726     ) -> int:
727         """Get a resource scale count.
728
729         Args:
730             cluster_uuid: The UUID of the cluster
731             resource_name: Resource name
732             kdu_instance: KDU instance name
733             kdu_model: The name or path of an Helm Chart
734             kwargs: Additional parameters
735
736         Returns:
737             Resource instance count
738         """
739
740 0         self.log.debug(
741             "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
742         )
743
744         # look for instance to obtain namespace
745 0         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
746 0         if not instance_info:
747 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
748
749         # init env, paths
750 0         paths, _ = self._init_paths_env(
751             cluster_name=cluster_uuid, create_if_not_exist=True
752         )
753
754 0         replicas = await self._get_replica_count_instance(
755             kdu_instance=kdu_instance,
756             namespace=instance_info["namespace"],
757             kubeconfig=paths["kube_config"],
758             resource_name=resource_name,
759         )
760
761 0         self.log.debug(
762             f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
763         )
764
765         # Get default value if scale count is not found from provided values
766         # Important note: this piece of code shall only be executed in the first scaling operation,
767         # since it is expected that the _get_replica_count_instance is able to obtain the number of
768         # replicas when a scale operation was already conducted previously for this KDU/resource!
769 0         if replicas is None:
770 0             repo_url = await self._find_repo(
771                 kdu_model=kdu_model, cluster_uuid=cluster_uuid
772             )
773 0             replicas, _ = await self._get_replica_count_url(
774                 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
775             )
776
777 0             self.log.debug(
778                 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
779                 f"{resource_name} obtained: {replicas}"
780             )
781
782 0             if replicas is None:
783 0                 msg = "Replica count not found. Cannot be scaled"
784 0                 self.log.error(msg)
785 0                 raise K8sException(msg)
786
787 0         return int(replicas)
788
789 1     async def rollback(
790         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
791     ):
792 1         self.log.debug(
793             "rollback kdu_instance {} to revision {} from cluster {}".format(
794                 kdu_instance, revision, cluster_uuid
795             )
796         )
797
798         # sync local dir
799 1         self.fs.sync(from_path=cluster_uuid)
800
801         # look for instance to obtain namespace
802 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
803 1         if not instance_info:
804 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
805
806         # init env, paths
807 1         paths, env = self._init_paths_env(
808             cluster_name=cluster_uuid, create_if_not_exist=True
809         )
810
811         # sync local dir
812 1         self.fs.sync(from_path=cluster_uuid)
813
814 1         command = self._get_rollback_command(
815             kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
816         )
817
818 1         self.log.debug("rolling_back: {}".format(command))
819
820         # exec helm in a task
821 1         exec_task = asyncio.ensure_future(
822             coro_or_future=self._local_async_exec(
823                 command=command, raise_exception_on_error=False, env=env
824             )
825         )
826         # write status in another task
827 1         status_task = asyncio.ensure_future(
828             coro_or_future=self._store_status(
829                 cluster_id=cluster_uuid,
830                 kdu_instance=kdu_instance,
831                 namespace=instance_info["namespace"],
832                 db_dict=db_dict,
833                 operation="rollback",
834             )
835         )
836
837         # wait for execution task
838 1         await asyncio.wait([exec_task])
839
840         # cancel status task
841 1         status_task.cancel()
842
843 1         output, rc = exec_task.result()
844
845         # write final status
846 1         await self._store_status(
847             cluster_id=cluster_uuid,
848             kdu_instance=kdu_instance,
849             namespace=instance_info["namespace"],
850             db_dict=db_dict,
851             operation="rollback",
852         )
853
854 1         if rc != 0:
855 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
856 0             self.log.error(msg)
857 0             raise K8sException(msg)
858
859         # sync fs
860 1         self.fs.reverse_sync(from_path=cluster_uuid)
861
862         # return new revision number
863 1         instance = await self.get_instance_info(
864             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
865         )
866 1         if instance:
867 1             revision = int(instance.get("revision"))
868 1             self.log.debug("New revision: {}".format(revision))
869 1             return revision
870         else:
871 0             return 0
872
873 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
874         """
875         Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
876         (this call should happen after all _terminate-config-primitive_ of the VNF
877         are invoked).
878
879         :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
880         :param kdu_instance: unique name for the KDU instance to be deleted
881         :param kwargs: Additional parameters (None yet)
882         :return: True if successful
883         """
884
885 1         self.log.debug(
886             "uninstall kdu_instance {} from cluster {}".format(
887                 kdu_instance, cluster_uuid
888             )
889         )
890
891         # sync local dir
892 1         self.fs.sync(from_path=cluster_uuid)
893
894         # look for instance to obtain namespace
895 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
896 1         if not instance_info:
897 0             self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
898 0             return True
899         # init env, paths
900 1         paths, env = self._init_paths_env(
901             cluster_name=cluster_uuid, create_if_not_exist=True
902         )
903
904         # sync local dir
905 1         self.fs.sync(from_path=cluster_uuid)
906
907 1         command = self._get_uninstall_command(
908             kdu_instance, instance_info["namespace"], paths["kube_config"]
909         )
910 1         output, _rc = await self._local_async_exec(
911             command=command, raise_exception_on_error=True, env=env
912         )
913
914         # sync fs
915 1         self.fs.reverse_sync(from_path=cluster_uuid)
916
917 1         return self._output_to_table(output)
918
919 1     async def instances_list(self, cluster_uuid: str) -> list:
920         """
921         returns a list of deployed releases in a cluster
922
923         :param cluster_uuid: the 'cluster' or 'namespace:cluster'
924         :return:
925         """
926
927 1         self.log.debug("list releases for cluster {}".format(cluster_uuid))
928
929         # sync local dir
930 1         self.fs.sync(from_path=cluster_uuid)
931
932         # execute internal command
933 1         result = await self._instances_list(cluster_uuid)
934
935         # sync fs
936 1         self.fs.reverse_sync(from_path=cluster_uuid)
937
938 1         return result
939
940 1     async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
941 0         instances = await self.instances_list(cluster_uuid=cluster_uuid)
942 0         for instance in instances:
943 0             if instance.get("name") == kdu_instance:
944 0                 return instance
945 0         self.log.debug("Instance {} not found".format(kdu_instance))
946 0         return None
947
948 1     async def upgrade_charm(
949         self,
950         ee_id: str = None,
951         path: str = None,
952         charm_id: str = None,
953         charm_type: str = None,
954         timeout: float = None,
955     ) -> str:
956         """This method upgrade charms in VNFs
957
958         Args:
959             ee_id:  Execution environment id
960             path:   Local path to the charm
961             charm_id:   charm-id
962             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
963             timeout: (Float)    Timeout for the ns update operation
964
965         Returns:
966             The output of the update operation if status equals to "completed"
967         """
968 0         raise K8sException("KDUs deployed with Helm do not support charm upgrade")
969
970 1     async def exec_primitive(
971         self,
972         cluster_uuid: str = None,
973         kdu_instance: str = None,
974         primitive_name: str = None,
975         timeout: float = 300,
976         params: dict = None,
977         db_dict: dict = None,
978         **kwargs,
979     ) -> str:
980         """Exec primitive (Juju action)
981
982         :param cluster_uuid: The UUID of the cluster or namespace:cluster
983         :param kdu_instance: The unique name of the KDU instance
984         :param primitive_name: Name of action that will be executed
985         :param timeout: Timeout for action execution
986         :param params: Dictionary of all the parameters needed for the action
987         :db_dict: Dictionary for any additional data
988         :param kwargs: Additional parameters (None yet)
989
990         :return: Returns the output of the action
991         """
992 0         raise K8sException(
993             "KDUs deployed with Helm don't support actions "
994             "different from rollback, upgrade and status"
995         )
996
997 1     async def get_services(
998         self, cluster_uuid: str, kdu_instance: str, namespace: str
999     ) -> list:
1000         """
1001         Returns a list of services defined for the specified kdu instance.
1002
1003         :param cluster_uuid: UUID of a K8s cluster known by OSM
1004         :param kdu_instance: unique name for the KDU instance
1005         :param namespace: K8s namespace used by the KDU instance
1006         :return: If successful, it will return a list of services, Each service
1007         can have the following data:
1008         - `name` of the service
1009         - `type` type of service in the k8 cluster
1010         - `ports` List of ports offered by the service, for each port includes at least
1011         name, port, protocol
1012         - `cluster_ip` Internal ip to be used inside k8s cluster
1013         - `external_ip` List of external ips (in case they are available)
1014         """
1015
1016 1         self.log.debug(
1017             "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1018                 cluster_uuid, kdu_instance
1019             )
1020         )
1021
1022         # init env, paths
1023 1         paths, env = self._init_paths_env(
1024             cluster_name=cluster_uuid, create_if_not_exist=True
1025         )
1026
1027         # sync local dir
1028 1         self.fs.sync(from_path=cluster_uuid)
1029
1030         # get list of services names for kdu
1031 1         service_names = await self._get_services(
1032             cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1033         )
1034
1035 1         service_list = []
1036 1         for service in service_names:
1037 1             service = await self._get_service(cluster_uuid, service, namespace)
1038 1             service_list.append(service)
1039
1040         # sync fs
1041 1         self.fs.reverse_sync(from_path=cluster_uuid)
1042
1043 1         return service_list
1044
1045 1     async def get_service(
1046         self, cluster_uuid: str, service_name: str, namespace: str
1047     ) -> object:
1048 1         self.log.debug(
1049             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1050                 service_name, namespace, cluster_uuid
1051             )
1052         )
1053
1054         # sync local dir
1055 1         self.fs.sync(from_path=cluster_uuid)
1056
1057 1         service = await self._get_service(cluster_uuid, service_name, namespace)
1058
1059         # sync fs
1060 1         self.fs.reverse_sync(from_path=cluster_uuid)
1061
1062 1         return service
1063
1064 1     async def status_kdu(
1065         self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1066     ) -> Union[str, dict]:
1067         """
1068         This call would retrieve tha current state of a given KDU instance. It would be
1069         would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1070         values_ of the configuration parameters applied to a given instance. This call
1071         would be based on the `status` call.
1072
1073         :param cluster_uuid: UUID of a K8s cluster known by OSM
1074         :param kdu_instance: unique name for the KDU instance
1075         :param kwargs: Additional parameters (None yet)
1076         :param yaml_format: if the return shall be returned as an YAML string or as a
1077                                 dictionary
1078         :return: If successful, it will return the following vector of arguments:
1079         - K8s `namespace` in the cluster where the KDU lives
1080         - `state` of the KDU instance. It can be:
1081               - UNKNOWN
1082               - DEPLOYED
1083               - DELETED
1084               - SUPERSEDED
1085               - FAILED or
1086               - DELETING
1087         - List of `resources` (objects) that this release consists of, sorted by kind,
1088           and the status of those resources
1089         - Last `deployment_time`.
1090
1091         """
1092 0         self.log.debug(
1093             "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1094                 cluster_uuid, kdu_instance
1095             )
1096         )
1097
1098         # sync local dir
1099 0         self.fs.sync(from_path=cluster_uuid)
1100
1101         # get instance: needed to obtain namespace
1102 0         instances = await self._instances_list(cluster_id=cluster_uuid)
1103 0         for instance in instances:
1104 0             if instance.get("name") == kdu_instance:
1105 0                 break
1106         else:
1107             # instance does not exist
1108 0             raise K8sException(
1109                 "Instance name: {} not found in cluster: {}".format(
1110                     kdu_instance, cluster_uuid
1111                 )
1112             )
1113
1114 0         status = await self._status_kdu(
1115             cluster_id=cluster_uuid,
1116             kdu_instance=kdu_instance,
1117             namespace=instance["namespace"],
1118             yaml_format=yaml_format,
1119             show_error_log=True,
1120         )
1121
1122         # sync fs
1123 0         self.fs.reverse_sync(from_path=cluster_uuid)
1124
1125 0         return status
1126
1127 1     async def get_values_kdu(
1128         self, kdu_instance: str, namespace: str, kubeconfig: str
1129     ) -> str:
1130 1         self.log.debug("get kdu_instance values {}".format(kdu_instance))
1131
1132 1         return await self._exec_get_command(
1133             get_command="values",
1134             kdu_instance=kdu_instance,
1135             namespace=namespace,
1136             kubeconfig=kubeconfig,
1137         )
1138
1139 1     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1140         """Method to obtain the Helm Chart package's values
1141
1142         Args:
1143             kdu_model: The name or path of an Helm Chart
1144             repo_url: Helm Chart repository url
1145
1146         Returns:
1147             str: the values of the Helm Chart package
1148         """
1149
1150 1         self.log.debug(
1151             "inspect kdu_model values {} from (optional) repo: {}".format(
1152                 kdu_model, repo_url
1153             )
1154         )
1155
1156 1         return await self._exec_inspect_command(
1157             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1158         )
1159
1160 1     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1161 1         self.log.debug(
1162             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1163         )
1164
1165 1         return await self._exec_inspect_command(
1166             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1167         )
1168
1169 1     async def synchronize_repos(self, cluster_uuid: str):
1170 1         self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1171 1         try:
1172 1             db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1173 1             db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1174
1175 1             local_repo_list = await self.repo_list(cluster_uuid)
1176 1             local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1177
1178 1             deleted_repo_list = []
1179 1             added_repo_dict = {}
1180
1181             # iterate over the list of repos in the database that should be
1182             # added if not present
1183 1             for repo_name, db_repo in db_repo_dict.items():
1184 1                 try:
1185                     # check if it is already present
1186 1                     curr_repo_url = local_repo_dict.get(db_repo["name"])
1187 1                     repo_id = db_repo.get("_id")
1188 1                     if curr_repo_url != db_repo["url"]:
1189 1                         if curr_repo_url:
1190 0                             self.log.debug(
1191                                 "repo {} url changed, delete and and again".format(
1192                                     db_repo["url"]
1193                                 )
1194                             )
1195 0                             await self.repo_remove(cluster_uuid, db_repo["name"])
1196 0                             deleted_repo_list.append(repo_id)
1197
1198                         # add repo
1199 1                         self.log.debug("add repo {}".format(db_repo["name"]))
1200 1                         if "ca_cert" in db_repo:
1201 0                             await self.repo_add(
1202                                 cluster_uuid,
1203                                 db_repo["name"],
1204                                 db_repo["url"],
1205                                 cert=db_repo["ca_cert"],
1206                             )
1207                         else:
1208 1                             await self.repo_add(
1209                                 cluster_uuid,
1210                                 db_repo["name"],
1211                                 db_repo["url"],
1212                             )
1213 1                         added_repo_dict[repo_id] = db_repo["name"]
1214 0                 except Exception as e:
1215 0                     raise K8sException(
1216                         "Error adding repo id: {}, err_msg: {} ".format(
1217                             repo_id, repr(e)
1218                         )
1219                     )
1220
1221             # Delete repos that are present but not in nbi_list
1222 1             for repo_name in local_repo_dict:
1223 1                 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1224 1                     self.log.debug("delete repo {}".format(repo_name))
1225 1                     try:
1226 1                         await self.repo_remove(cluster_uuid, repo_name)
1227 1                         deleted_repo_list.append(repo_name)
1228 0                     except Exception as e:
1229 0                         self.warning(
1230                             "Error deleting repo, name: {}, err_msg: {}".format(
1231                                 repo_name, str(e)
1232                             )
1233                         )
1234
1235 1             return deleted_repo_list, added_repo_dict
1236
1237 0         except K8sException:
1238 0             raise
1239 0         except Exception as e:
1240             # Do not raise errors synchronizing repos
1241 0             self.log.error("Error synchronizing repos: {}".format(e))
1242 0             raise Exception("Error synchronizing repos: {}".format(e))
1243
1244 1     def _get_db_repos_dict(self, repo_ids: list):
1245 1         db_repos_dict = {}
1246 1         for repo_id in repo_ids:
1247 1             db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1248 1             db_repos_dict[db_repo["name"]] = db_repo
1249 1         return db_repos_dict
1250
1251     """
1252     ####################################################################################
1253     ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1254     ####################################################################################
1255     """
1256
1257 1     @abc.abstractmethod
1258 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1259         """
1260         Creates and returns base cluster and kube dirs and returns them.
1261         Also created helm3 dirs according to new directory specification, paths are
1262         not returned but assigned to helm environment variables
1263
1264         :param cluster_name:  cluster_name
1265         :return: Dictionary with config_paths and dictionary with helm environment variables
1266         """
1267
1268 1     @abc.abstractmethod
1269 1     async def _cluster_init(self, cluster_id, namespace, paths, env):
1270         """
1271         Implements the helm version dependent cluster initialization
1272         """
1273
1274 1     @abc.abstractmethod
1275 1     async def _instances_list(self, cluster_id):
1276         """
1277         Implements the helm version dependent helm instances list
1278         """
1279
1280 1     @abc.abstractmethod
1281 1     async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1282         """
1283         Implements the helm version dependent method to obtain services from a helm instance
1284         """
1285
1286 1     @abc.abstractmethod
1287 1     async def _status_kdu(
1288         self,
1289         cluster_id: str,
1290         kdu_instance: str,
1291         namespace: str = None,
1292         yaml_format: bool = False,
1293         show_error_log: bool = False,
1294     ) -> Union[str, dict]:
1295         """
1296         Implements the helm version dependent method to obtain status of a helm instance
1297         """
1298
1299 1     @abc.abstractmethod
1300 1     def _get_install_command(
1301         self,
1302         kdu_model,
1303         kdu_instance,
1304         namespace,
1305         params_str,
1306         version,
1307         atomic,
1308         timeout,
1309         kubeconfig,
1310     ) -> str:
1311         """
1312         Obtain command to be executed to delete the indicated instance
1313         """
1314
1315 1     @abc.abstractmethod
1316 1     def _get_upgrade_scale_command(
1317         self,
1318         kdu_model,
1319         kdu_instance,
1320         namespace,
1321         count,
1322         version,
1323         atomic,
1324         replicas,
1325         timeout,
1326         resource_name,
1327         kubeconfig,
1328     ) -> str:
1329         """Generates the command to scale a Helm Chart release
1330
1331         Args:
1332             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1333             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1334             namespace (str): Namespace where this KDU instance is deployed
1335             scale (int): Scale count
1336             version (str): Constraint with specific version of the Chart to use
1337             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1338                 The --wait flag will be set automatically if --atomic is used
1339             replica_str (str): The key under resource_name key where the scale count is stored
1340             timeout (float): The time, in seconds, to wait
1341             resource_name (str): The KDU's resource to scale
1342             kubeconfig (str): Kubeconfig file path
1343
1344         Returns:
1345             str: command to scale a Helm Chart release
1346         """
1347
1348 1     @abc.abstractmethod
1349 1     def _get_upgrade_command(
1350         self,
1351         kdu_model,
1352         kdu_instance,
1353         namespace,
1354         params_str,
1355         version,
1356         atomic,
1357         timeout,
1358         kubeconfig,
1359         force,
1360     ) -> str:
1361         """Generates the command to upgrade a Helm Chart release
1362
1363         Args:
1364             kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1365             kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1366             namespace (str): Namespace where this KDU instance is deployed
1367             params_str (str): Params used to upgrade the Helm Chart release
1368             version (str): Constraint with specific version of the Chart to use
1369             atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1370                 The --wait flag will be set automatically if --atomic is used
1371             timeout (float): The time, in seconds, to wait
1372             kubeconfig (str): Kubeconfig file path
1373             force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1374         Returns:
1375             str: command to upgrade a Helm Chart release
1376         """
1377
1378 1     @abc.abstractmethod
1379 1     def _get_rollback_command(
1380         self, kdu_instance, namespace, revision, kubeconfig
1381     ) -> str:
1382         """
1383         Obtain command to be executed to rollback the indicated instance
1384         """
1385
1386 1     @abc.abstractmethod
1387 1     def _get_uninstall_command(
1388         self, kdu_instance: str, namespace: str, kubeconfig: str
1389     ) -> str:
1390         """
1391         Obtain command to be executed to delete the indicated instance
1392         """
1393
1394 1     @abc.abstractmethod
1395 1     def _get_inspect_command(
1396         self, show_command: str, kdu_model: str, repo_str: str, version: str
1397     ):
1398         """Generates the command to obtain the information about an Helm Chart package
1399             (´helm show ...´ command)
1400
1401         Args:
1402             show_command: the second part of the command (`helm show <show_command>`)
1403             kdu_model: The name or path of an Helm Chart
1404             repo_url: Helm Chart repository url
1405             version: constraint with specific version of the Chart to use
1406
1407         Returns:
1408             str: the generated Helm Chart command
1409         """
1410
1411 1     @abc.abstractmethod
1412 1     def _get_get_command(
1413         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1414     ):
1415         """Obtain command to be executed to get information about the kdu instance."""
1416
1417 1     @abc.abstractmethod
1418 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
1419         """
1420         Method call to uninstall cluster software for helm. This method is dependent
1421         of helm version
1422         For Helm v2 it will be called when Tiller must be uninstalled
1423         For Helm v3 it does nothing and does not need to be callled
1424         """
1425
1426 1     @abc.abstractmethod
1427 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1428         """
1429         Obtains the cluster repos identifiers
1430         """
1431
1432     """
1433     ####################################################################################
1434     ################################### P R I V A T E ##################################
1435     ####################################################################################
1436     """
1437
1438 1     @staticmethod
1439 1     def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1440 0         if os.path.exists(filename):
1441 0             return True
1442         else:
1443 0             msg = "File {} does not exist".format(filename)
1444 0             if exception_if_not_exists:
1445 0                 raise K8sException(msg)
1446
1447 1     @staticmethod
1448 1     def _remove_multiple_spaces(strobj):
1449 0         strobj = strobj.strip()
1450 0         while "  " in strobj:
1451 0             strobj = strobj.replace("  ", " ")
1452 0         return strobj
1453
1454 1     @staticmethod
1455 1     def _output_to_lines(output: str) -> list:
1456 0         output_lines = list()
1457 0         lines = output.splitlines(keepends=False)
1458 0         for line in lines:
1459 0             line = line.strip()
1460 0             if len(line) > 0:
1461 0                 output_lines.append(line)
1462 0         return output_lines
1463
1464 1     @staticmethod
1465 1     def _output_to_table(output: str) -> list:
1466 1         output_table = list()
1467 1         lines = output.splitlines(keepends=False)
1468 1         for line in lines:
1469 0             line = line.replace("\t", " ")
1470 0             line_list = list()
1471 0             output_table.append(line_list)
1472 0             cells = line.split(sep=" ")
1473 0             for cell in cells:
1474 0                 cell = cell.strip()
1475 0                 if len(cell) > 0:
1476 0                     line_list.append(cell)
1477 1         return output_table
1478
1479 1     @staticmethod
1480 1     def _parse_services(output: str) -> list:
1481 0         lines = output.splitlines(keepends=False)
1482 0         services = []
1483 0         for line in lines:
1484 0             line = line.replace("\t", " ")
1485 0             cells = line.split(sep=" ")
1486 0             if len(cells) > 0 and cells[0].startswith("service/"):
1487 0                 elems = cells[0].split(sep="/")
1488 0                 if len(elems) > 1:
1489 0                     services.append(elems[1])
1490 0         return services
1491
1492 1     @staticmethod
1493 1     def _get_deep(dictionary: dict, members: tuple):
1494 1         target = dictionary
1495 1         value = None
1496 1         try:
1497 1             for m in members:
1498 1                 value = target.get(m)
1499 0                 if not value:
1500 0                     return None
1501                 else:
1502 0                     target = value
1503 1         except Exception:
1504 1             pass
1505 1         return value
1506
1507     # find key:value in several lines
1508 1     @staticmethod
1509 1     def _find_in_lines(p_lines: list, p_key: str) -> str:
1510 0         for line in p_lines:
1511 0             try:
1512 0                 if line.startswith(p_key + ":"):
1513 0                     parts = line.split(":")
1514 0                     the_value = parts[1].strip()
1515 0                     return the_value
1516 0             except Exception:
1517                 # ignore it
1518 0                 pass
1519 0         return None
1520
1521 1     @staticmethod
1522 1     def _lower_keys_list(input_list: list):
1523         """
1524         Transform the keys in a list of dictionaries to lower case and returns a new list
1525         of dictionaries
1526         """
1527 0         new_list = []
1528 0         if input_list:
1529 0             for dictionary in input_list:
1530 0                 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1531 0                 new_list.append(new_dict)
1532 0         return new_list
1533
1534 1     async def _local_async_exec(
1535         self,
1536         command: str,
1537         raise_exception_on_error: bool = False,
1538         show_error_log: bool = True,
1539         encode_utf8: bool = False,
1540         env: dict = None,
1541     ) -> (str, int):
1542 0         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1543 0         self.log.debug(
1544             "Executing async local command: {}, env: {}".format(command, env)
1545         )
1546
1547         # split command
1548 0         command = shlex.split(command)
1549
1550 0         environ = os.environ.copy()
1551 0         if env:
1552 0             environ.update(env)
1553
1554 0         try:
1555 0             async with self.cmd_lock:
1556 0                 process = await asyncio.create_subprocess_exec(
1557                     *command,
1558                     stdout=asyncio.subprocess.PIPE,
1559                     stderr=asyncio.subprocess.PIPE,
1560                     env=environ,
1561                 )
1562
1563                 # wait for command terminate
1564 0                 stdout, stderr = await process.communicate()
1565
1566 0                 return_code = process.returncode
1567
1568 0             output = ""
1569 0             if stdout:
1570 0                 output = stdout.decode("utf-8").strip()
1571                 # output = stdout.decode()
1572 0             if stderr:
1573 0                 output = stderr.decode("utf-8").strip()
1574                 # output = stderr.decode()
1575
1576 0             if return_code != 0 and show_error_log:
1577 0                 self.log.debug(
1578                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1579                 )
1580             else:
1581 0                 self.log.debug("Return code: {}".format(return_code))
1582
1583 0             if raise_exception_on_error and return_code != 0:
1584 0                 raise K8sException(output)
1585
1586 0             if encode_utf8:
1587 0                 output = output.encode("utf-8").strip()
1588 0                 output = str(output).replace("\\n", "\n")
1589
1590 0             return output, return_code
1591
1592 0         except asyncio.CancelledError:
1593             # first, kill the process if it is still running
1594 0             if process.returncode is None:
1595 0                 process.kill()
1596 0             raise
1597 0         except K8sException:
1598 0             raise
1599 0         except Exception as e:
1600 0             msg = "Exception executing command: {} -> {}".format(command, e)
1601 0             self.log.error(msg)
1602 0             if raise_exception_on_error:
1603 0                 raise K8sException(e) from e
1604             else:
1605 0                 return "", -1
1606
1607 1     async def _local_async_exec_pipe(
1608         self,
1609         command1: str,
1610         command2: str,
1611         raise_exception_on_error: bool = True,
1612         show_error_log: bool = True,
1613         encode_utf8: bool = False,
1614         env: dict = None,
1615     ):
1616 0         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1617 0         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1618 0         command = "{} | {}".format(command1, command2)
1619 0         self.log.debug(
1620             "Executing async local command: {}, env: {}".format(command, env)
1621         )
1622
1623         # split command
1624 0         command1 = shlex.split(command1)
1625 0         command2 = shlex.split(command2)
1626
1627 0         environ = os.environ.copy()
1628 0         if env:
1629 0             environ.update(env)
1630
1631 0         try:
1632 0             async with self.cmd_lock:
1633 0                 read, write = os.pipe()
1634 0                 process_1 = await asyncio.create_subprocess_exec(
1635                     *command1, stdout=write, env=environ
1636                 )
1637 0                 os.close(write)
1638 0                 process_2 = await asyncio.create_subprocess_exec(
1639                     *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1640                 )
1641 0                 os.close(read)
1642 0                 stdout, stderr = await process_2.communicate()
1643
1644 0                 return_code = process_2.returncode
1645
1646 0             output = ""
1647 0             if stdout:
1648 0                 output = stdout.decode("utf-8").strip()
1649                 # output = stdout.decode()
1650 0             if stderr:
1651 0                 output = stderr.decode("utf-8").strip()
1652                 # output = stderr.decode()
1653
1654 0             if return_code != 0 and show_error_log:
1655 0                 self.log.debug(
1656                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1657                 )
1658             else:
1659 0                 self.log.debug("Return code: {}".format(return_code))
1660
1661 0             if raise_exception_on_error and return_code != 0:
1662 0                 raise K8sException(output)
1663
1664 0             if encode_utf8:
1665 0                 output = output.encode("utf-8").strip()
1666 0                 output = str(output).replace("\\n", "\n")
1667
1668 0             return output, return_code
1669 0         except asyncio.CancelledError:
1670             # first, kill the processes if they are still running
1671 0             for process in (process_1, process_2):
1672 0                 if process.returncode is None:
1673 0                     process.kill()
1674 0             raise
1675 0         except K8sException:
1676 0             raise
1677 0         except Exception as e:
1678 0             msg = "Exception executing command: {} -> {}".format(command, e)
1679 0             self.log.error(msg)
1680 0             if raise_exception_on_error:
1681 0                 raise K8sException(e) from e
1682             else:
1683 0                 return "", -1
1684
1685 1     async def _get_service(self, cluster_id, service_name, namespace):
1686         """
1687         Obtains the data of the specified service in the k8cluster.
1688
1689         :param cluster_id: id of a K8s cluster known by OSM
1690         :param service_name: name of the K8s service in the specified namespace
1691         :param namespace: K8s namespace used by the KDU instance
1692         :return: If successful, it will return a service with the following data:
1693         - `name` of the service
1694         - `type` type of service in the k8 cluster
1695         - `ports` List of ports offered by the service, for each port includes at least
1696         name, port, protocol
1697         - `cluster_ip` Internal ip to be used inside k8s cluster
1698         - `external_ip` List of external ips (in case they are available)
1699         """
1700
1701         # init config, env
1702 1         paths, env = self._init_paths_env(
1703             cluster_name=cluster_id, create_if_not_exist=True
1704         )
1705
1706 1         command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1707             self.kubectl_command, paths["kube_config"], namespace, service_name
1708         )
1709
1710 1         output, _rc = await self._local_async_exec(
1711             command=command, raise_exception_on_error=True, env=env
1712         )
1713
1714 1         data = yaml.load(output, Loader=yaml.SafeLoader)
1715
1716 1         service = {
1717             "name": service_name,
1718             "type": self._get_deep(data, ("spec", "type")),
1719             "ports": self._get_deep(data, ("spec", "ports")),
1720             "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1721         }
1722 1         if service["type"] == "LoadBalancer":
1723 0             ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1724 0             ip_list = [elem["ip"] for elem in ip_map_list]
1725 0             service["external_ip"] = ip_list
1726
1727 1         return service
1728
1729 1     async def _exec_get_command(
1730         self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1731     ):
1732         """Obtains information about the kdu instance."""
1733
1734 1         full_command = self._get_get_command(
1735             get_command, kdu_instance, namespace, kubeconfig
1736         )
1737
1738 1         output, _rc = await self._local_async_exec(command=full_command)
1739
1740 1         return output
1741
1742 1     async def _exec_inspect_command(
1743         self, inspect_command: str, kdu_model: str, repo_url: str = None
1744     ):
1745         """Obtains information about an Helm Chart package (´helm show´ command)
1746
1747         Args:
1748             inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1749             kdu_model: The name or path of an Helm Chart
1750             repo_url: Helm Chart repository url
1751
1752         Returns:
1753             str: the requested info about the Helm Chart package
1754         """
1755
1756 1         repo_str = ""
1757 1         if repo_url:
1758 1             repo_str = " --repo {}".format(repo_url)
1759
1760             # Obtain the Chart's name and store it in the var kdu_model
1761 1             kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1762
1763 1         kdu_model, version = self._split_version(kdu_model)
1764 1         if version:
1765 1             version_str = "--version {}".format(version)
1766         else:
1767 0             version_str = ""
1768
1769 1         full_command = self._get_inspect_command(
1770             show_command=inspect_command,
1771             kdu_model=kdu_model,
1772             repo_str=repo_str,
1773             version=version_str,
1774         )
1775
1776 1         output, _ = await self._local_async_exec(command=full_command)
1777
1778 1         return output
1779
1780 1     async def _get_replica_count_url(
1781         self,
1782         kdu_model: str,
1783         repo_url: str = None,
1784         resource_name: str = None,
1785     ) -> (int, str):
1786         """Get the replica count value in the Helm Chart Values.
1787
1788         Args:
1789             kdu_model: The name or path of an Helm Chart
1790             repo_url: Helm Chart repository url
1791             resource_name: Resource name
1792
1793         Returns:
1794             A tuple with:
1795             - The number of replicas of the specific instance; if not found, returns None; and
1796             - The string corresponding to the replica count key in the Helm values
1797         """
1798
1799 1         kdu_values = yaml.load(
1800             await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1801             Loader=yaml.SafeLoader,
1802         )
1803
1804 1         self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1805
1806 1         if not kdu_values:
1807 0             raise K8sException(
1808                 "kdu_values not found for kdu_model {}".format(kdu_model)
1809             )
1810
1811 1         if resource_name:
1812 1             kdu_values = kdu_values.get(resource_name, None)
1813
1814 1         if not kdu_values:
1815 0             msg = "resource {} not found in the values in model {}".format(
1816                 resource_name, kdu_model
1817             )
1818 0             self.log.error(msg)
1819 0             raise K8sException(msg)
1820
1821 1         duplicate_check = False
1822
1823 1         replica_str = ""
1824 1         replicas = None
1825
1826 1         if kdu_values.get("replicaCount") is not None:
1827 1             replicas = kdu_values["replicaCount"]
1828 1             replica_str = "replicaCount"
1829 1         elif kdu_values.get("replicas") is not None:
1830 1             duplicate_check = True
1831 1             replicas = kdu_values["replicas"]
1832 1             replica_str = "replicas"
1833         else:
1834 0             if resource_name:
1835 0                 msg = (
1836                     "replicaCount or replicas not found in the resource"
1837                     "{} values in model {}. Cannot be scaled".format(
1838                         resource_name, kdu_model
1839                     )
1840                 )
1841             else:
1842 0                 msg = (
1843                     "replicaCount or replicas not found in the values"
1844                     "in model {}. Cannot be scaled".format(kdu_model)
1845                 )
1846 0             self.log.error(msg)
1847 0             raise K8sException(msg)
1848
1849         # Control if replicas and replicaCount exists at the same time
1850 1         msg = "replicaCount and replicas are exists at the same time"
1851 1         if duplicate_check:
1852 1             if "replicaCount" in kdu_values:
1853 0                 self.log.error(msg)
1854 0                 raise K8sException(msg)
1855         else:
1856 1             if "replicas" in kdu_values:
1857 0                 self.log.error(msg)
1858 0                 raise K8sException(msg)
1859
1860 1         return replicas, replica_str
1861
1862 1     async def _get_replica_count_instance(
1863         self,
1864         kdu_instance: str,
1865         namespace: str,
1866         kubeconfig: str,
1867         resource_name: str = None,
1868     ) -> int:
1869         """Get the replica count value in the instance.
1870
1871         Args:
1872             kdu_instance: The name of the KDU instance
1873             namespace: KDU instance namespace
1874             kubeconfig:
1875             resource_name: Resource name
1876
1877         Returns:
1878             The number of replicas of the specific instance; if not found, returns None
1879         """
1880
1881 0         kdu_values = yaml.load(
1882             await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1883             Loader=yaml.SafeLoader,
1884         )
1885
1886 0         self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1887
1888 0         replicas = None
1889
1890 0         if kdu_values:
1891 0             resource_values = (
1892                 kdu_values.get(resource_name, None) if resource_name else None
1893             )
1894
1895 0             for replica_str in ("replicaCount", "replicas"):
1896 0                 if resource_values:
1897 0                     replicas = resource_values.get(replica_str)
1898                 else:
1899 0                     replicas = kdu_values.get(replica_str)
1900
1901 0                 if replicas is not None:
1902 0                     break
1903
1904 0         return replicas
1905
1906 1     async def _store_status(
1907         self,
1908         cluster_id: str,
1909         operation: str,
1910         kdu_instance: str,
1911         namespace: str = None,
1912         db_dict: dict = None,
1913     ) -> None:
1914         """
1915         Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1916
1917         :param cluster_id (str): the cluster where the KDU instance is deployed
1918         :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1919         :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1920         :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1921         :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1922         values for the keys:
1923             - "collection": The Mongo DB collection to write to
1924             - "filter": The query filter to use in the update process
1925             - "path": The dot separated keys which targets the object to be updated
1926         Defaults to None.
1927         """
1928
1929 1         try:
1930 1             detailed_status = await self._status_kdu(
1931                 cluster_id=cluster_id,
1932                 kdu_instance=kdu_instance,
1933                 yaml_format=False,
1934                 namespace=namespace,
1935             )
1936
1937 1             status = detailed_status.get("info").get("description")
1938 1             self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1939
1940             # write status to db
1941 1             result = await self.write_app_status_to_db(
1942                 db_dict=db_dict,
1943                 status=str(status),
1944                 detailed_status=str(detailed_status),
1945                 operation=operation,
1946             )
1947
1948 1             if not result:
1949 0                 self.log.info("Error writing in database. Task exiting...")
1950
1951 0         except asyncio.CancelledError as e:
1952 0             self.log.warning(
1953                 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1954             )
1955 0         except Exception as e:
1956 0             self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1957
1958     # params for use in -f file
1959     # returns values file option and filename (in order to delete it at the end)
1960 1     def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1961 1         if params and len(params) > 0:
1962 0             self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1963
1964 0             def get_random_number():
1965 0                 r = random.randrange(start=1, stop=99999999)
1966 0                 s = str(r)
1967 0                 while len(s) < 10:
1968 0                     s = "0" + s
1969 0                 return s
1970
1971 0             params2 = dict()
1972 0             for key in params:
1973 0                 value = params.get(key)
1974 0                 if "!!yaml" in str(value):
1975 0                     value = yaml.safe_load(value[7:])
1976 0                 params2[key] = value
1977
1978 0             values_file = get_random_number() + ".yaml"
1979 0             with open(values_file, "w") as stream:
1980 0                 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1981
1982 0             return "-f {}".format(values_file), values_file
1983
1984 1         return "", None
1985
1986     # params for use in --set option
1987 1     @staticmethod
1988 1     def _params_to_set_option(params: dict) -> str:
1989 1         params_str = ""
1990 1         if params and len(params) > 0:
1991 1             start = True
1992 1             for key in params:
1993 1                 value = params.get(key, None)
1994 1                 if value is not None:
1995 1                     if start:
1996 1                         params_str += "--set "
1997 1                         start = False
1998                     else:
1999 0                         params_str += ","
2000 1                     params_str += "{}={}".format(key, value)
2001 1         return params_str
2002
2003 1     @staticmethod
2004 1     def generate_kdu_instance_name(**kwargs):
2005 0         chart_name = kwargs["kdu_model"]
2006         # check embeded chart (file or dir)
2007 0         if chart_name.startswith("/"):
2008             # extract file or directory name
2009 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
2010         # check URL
2011 0         elif "://" in chart_name:
2012             # extract last portion of URL
2013 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
2014
2015 0         name = ""
2016 0         for c in chart_name:
2017 0             if c.isalpha() or c.isnumeric():
2018 0                 name += c
2019             else:
2020 0                 name += "-"
2021 0         if len(name) > 35:
2022 0             name = name[0:35]
2023
2024         # if does not start with alpha character, prefix 'a'
2025 0         if not name[0].isalpha():
2026 0             name = "a" + name
2027
2028 0         name += "-"
2029
2030 0         def get_random_number():
2031 0             r = random.randrange(start=1, stop=99999999)
2032 0             s = str(r)
2033 0             s = s.rjust(10, "0")
2034 0             return s
2035
2036 0         name = name + get_random_number()
2037 0         return name.lower()
2038
2039 1     def _split_version(self, kdu_model: str) -> (str, str):
2040 1         version = None
2041 1         if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2042 1             parts = kdu_model.split(sep=":")
2043 1             if len(parts) == 2:
2044 1                 version = str(parts[1])
2045 1                 kdu_model = parts[0]
2046 1         return kdu_model, version
2047
2048 1     def _split_repo(self, kdu_model: str) -> (str, str):
2049         """Obtain the Helm Chart's repository and Chart's names from the KDU model
2050
2051         Args:
2052             kdu_model (str): Associated KDU model
2053
2054         Returns:
2055             (str, str): Tuple with the Chart name in index 0, and the repo name
2056                         in index 2; if there was a problem finding them, return None
2057                         for both
2058         """
2059
2060 1         chart_name = None
2061 1         repo_name = None
2062
2063 1         idx = kdu_model.find("/")
2064 1         if idx >= 0:
2065 1             chart_name = kdu_model[idx + 1 :]
2066 1             repo_name = kdu_model[:idx]
2067
2068 1         return chart_name, repo_name
2069
2070 1     async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2071         """Obtain the Helm repository for an Helm Chart
2072
2073         Args:
2074             kdu_model (str): the KDU model associated with the Helm Chart instantiation
2075             cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2076
2077         Returns:
2078             str: the repository URL; if Helm Chart is a local one, the function returns None
2079         """
2080
2081 1         _, repo_name = self._split_repo(kdu_model=kdu_model)
2082
2083 1         repo_url = None
2084 1         if repo_name:
2085             # Find repository link
2086 1             local_repo_list = await self.repo_list(cluster_uuid)
2087 1             for repo in local_repo_list:
2088 1                 if repo["name"] == repo_name:
2089 1                     repo_url = repo["url"]
2090 1                     break  # it is not necessary to continue the loop if the repo link was found...
2091
2092 1         return repo_url
2093
2094 1     async def create_certificate(
2095         self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2096     ):
2097 0         paths, env = self._init_paths_env(
2098             cluster_name=cluster_uuid, create_if_not_exist=True
2099         )
2100 0         kubectl = Kubectl(config_file=paths["kube_config"])
2101 0         await kubectl.create_certificate(
2102             namespace=namespace,
2103             name=name,
2104             dns_prefix=dns_prefix,
2105             secret_name=secret_name,
2106             usages=[usage],
2107             issuer_name="ca-issuer",
2108         )
2109
2110 1     async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2111 0         paths, env = self._init_paths_env(
2112             cluster_name=cluster_uuid, create_if_not_exist=True
2113         )
2114 0         kubectl = Kubectl(config_file=paths["kube_config"])
2115 0         await kubectl.delete_certificate(namespace, certificate_name)