Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_conn.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_helm_conn.py
0%
0/1
0%
0/575
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_conn.py
0%
0/575
N/A

Source

n2vc/k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 0 import asyncio
24 0 import os
25 0 import random
26 0 import shutil
27 0 import subprocess
28 0 import time
29 0 from uuid import uuid4
30
31 0 from n2vc.exceptions import K8sException
32 0 from n2vc.k8s_conn import K8sConnector
33 0 import yaml
34
35
36 0 class K8sHelmConnector(K8sConnector):
37
38     """
39     ####################################################################################
40     ################################### P U B L I C ####################################
41     ####################################################################################
42     """
43
44 0     def __init__(
45         self,
46         fs: object,
47         db: object,
48         kubectl_command: str = "/usr/bin/kubectl",
49         helm_command: str = "/usr/bin/helm",
50         log: object = None,
51         on_update_db=None,
52     ):
53         """
54
55         :param fs: file system for kubernetes and helm configuration
56         :param db: database object to write current operation status
57         :param kubectl_command: path to kubectl executable
58         :param helm_command: path to helm executable
59         :param log: logger
60         :param on_update_db: callback called when k8s connector updates database
61         """
62
63         # parent class
64 0         K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
65
66 0         self.log.info("Initializing K8S Helm connector")
67
68         # random numbers for release name generation
69 0         random.seed(time.time())
70
71         # the file system
72 0         self.fs = fs
73
74         # exception if kubectl is not installed
75 0         self.kubectl_command = kubectl_command
76 0         self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
77
78         # exception if helm is not installed
79 0         self._helm_command = helm_command
80 0         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
81
82         # initialize helm client-only
83 0         self.log.debug("Initializing helm client-only...")
84 0         command = "{} init --client-only".format(self._helm_command)
85 0         try:
86 0             asyncio.ensure_future(
87                 self._local_async_exec(command=command, raise_exception_on_error=False)
88             )
89             # loop = asyncio.get_event_loop()
90             # loop.run_until_complete(self._local_async_exec(command=command,
91             # raise_exception_on_error=False))
92 0         except Exception as e:
93 0             self.warning(
94                 msg="helm init failed (it was already initialized): {}".format(e)
95             )
96
97 0         self.log.info("K8S Helm connector initialized")
98
99 0     async def init_env(
100         self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
101     ) -> (str, bool):
102         """
103         It prepares a given K8s cluster environment to run Charts on both sides:
104             client (OSM)
105             server (Tiller)
106
107         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
108             '.kube/config'
109         :param namespace: optional namespace to be used for helm. By default,
110             'kube-system' will be used
111         :param reuse_cluster_uuid: existing cluster uuid for reuse
112         :return: uuid of the K8s cluster and True if connector has installed some
113             software in the cluster
114         (on error, an exception will be raised)
115         """
116
117 0         cluster_uuid = reuse_cluster_uuid
118 0         if not cluster_uuid:
119 0             cluster_uuid = str(uuid4())
120
121 0         self.log.debug("Initializing K8S environment. namespace: {}".format(namespace))
122
123         # create config filename
124 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
125             cluster_name=cluster_uuid, create_if_not_exist=True
126         )
127 0         f = open(config_filename, "w")
128 0         f.write(k8s_creds)
129 0         f.close()
130
131         # check if tiller pod is up in cluster
132 0         command = "{} --kubeconfig={} --namespace={} get deployments".format(
133             self.kubectl_command, config_filename, namespace
134         )
135 0         output, _rc = await self._local_async_exec(
136             command=command, raise_exception_on_error=True
137         )
138
139 0         output_table = K8sHelmConnector._output_to_table(output=output)
140
141         # find 'tiller' pod in all pods
142 0         already_initialized = False
143 0         try:
144 0             for row in output_table:
145 0                 if row[0].startswith("tiller-deploy"):
146 0                     already_initialized = True
147 0                     break
148 0         except Exception:
149 0             pass
150
151         # helm init
152 0         n2vc_installed_sw = False
153 0         if not already_initialized:
154 0             self.log.info(
155                 "Initializing helm in client and server: {}".format(cluster_uuid)
156             )
157 0             command = "{} --kubeconfig={} --tiller-namespace={} --home={} init".format(
158                 self._helm_command, config_filename, namespace, helm_dir
159             )
160 0             output, _rc = await self._local_async_exec(
161                 command=command, raise_exception_on_error=True
162             )
163 0             n2vc_installed_sw = True
164         else:
165             # check client helm installation
166 0             check_file = helm_dir + "/repository/repositories.yaml"
167 0             if not self._check_file_exists(
168                 filename=check_file, exception_if_not_exists=False
169             ):
170 0                 self.log.info("Initializing helm in client: {}".format(cluster_uuid))
171 0                 command = (
172                     "{} --kubeconfig={} --tiller-namespace={} "
173                     "--home={} init --client-only"
174                 ).format(self._helm_command, config_filename, namespace, helm_dir)
175 0                 output, _rc = await self._local_async_exec(
176                     command=command, raise_exception_on_error=True
177                 )
178             else:
179 0                 self.log.info("Helm client already initialized")
180
181 0         self.log.info("Cluster initialized {}".format(cluster_uuid))
182
183 0         return cluster_uuid, n2vc_installed_sw
184
185 0     async def repo_add(
186         self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
187     ):
188
189 0         self.log.debug("adding {} repository {}. URL: {}".format(repo_type, name, url))
190
191         # config filename
192 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
193             cluster_name=cluster_uuid, create_if_not_exist=True
194         )
195
196         # helm repo update
197 0         command = "{} --kubeconfig={} --home={} repo update".format(
198             self._helm_command, config_filename, helm_dir
199         )
200 0         self.log.debug("updating repo: {}".format(command))
201 0         await self._local_async_exec(command=command, raise_exception_on_error=False)
202
203         # helm repo add name url
204 0         command = "{} --kubeconfig={} --home={} repo add {} {}".format(
205             self._helm_command, config_filename, helm_dir, name, url
206         )
207 0         self.log.debug("adding repo: {}".format(command))
208 0         await self._local_async_exec(command=command, raise_exception_on_error=True)
209
210 0     async def repo_list(self, cluster_uuid: str) -> list:
211         """
212         Get the list of registered repositories
213
214         :return: list of registered repositories: [ (name, url) .... ]
215         """
216
217 0         self.log.debug("list repositories for cluster {}".format(cluster_uuid))
218
219         # config filename
220 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
221             cluster_name=cluster_uuid, create_if_not_exist=True
222         )
223
224 0         command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
225             self._helm_command, config_filename, helm_dir
226         )
227
228 0         output, _rc = await self._local_async_exec(
229             command=command, raise_exception_on_error=True
230         )
231 0         if output and len(output) > 0:
232 0             return yaml.load(output, Loader=yaml.SafeLoader)
233         else:
234 0             return []
235
236 0     async def repo_remove(self, cluster_uuid: str, name: str):
237         """
238         Remove a repository from OSM
239
240         :param cluster_uuid: the cluster
241         :param name: repo name in OSM
242         :return: True if successful
243         """
244
245 0         self.log.debug("list repositories for cluster {}".format(cluster_uuid))
246
247         # config filename
248 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
249             cluster_name=cluster_uuid, create_if_not_exist=True
250         )
251
252 0         command = "{} --kubeconfig={} --home={} repo remove {}".format(
253             self._helm_command, config_filename, helm_dir, name
254         )
255
256 0         await self._local_async_exec(command=command, raise_exception_on_error=True)
257
258 0     async def reset(
259         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
260     ) -> bool:
261
262 0         self.log.debug(
263             "Resetting K8s environment. cluster uuid: {}".format(cluster_uuid)
264         )
265
266         # get kube and helm directories
267 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
268             cluster_name=cluster_uuid, create_if_not_exist=False
269         )
270
271         # uninstall releases if needed
272 0         releases = await self.instances_list(cluster_uuid=cluster_uuid)
273 0         if len(releases) > 0:
274 0             if force:
275 0                 for r in releases:
276 0                     try:
277 0                         kdu_instance = r.get("Name")
278 0                         chart = r.get("Chart")
279 0                         self.log.debug(
280                             "Uninstalling {} -> {}".format(chart, kdu_instance)
281                         )
282 0                         await self.uninstall(
283                             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
284                         )
285 0                     except Exception as e:
286 0                         self.log.error(
287                             "Error uninstalling release {}: {}".format(kdu_instance, e)
288                         )
289             else:
290 0                 msg = (
291                     "Cluster has releases and not force. Cannot reset K8s "
292                     "environment. Cluster uuid: {}"
293                 ).format(cluster_uuid)
294 0                 self.log.error(msg)
295 0                 raise K8sException(msg)
296
297 0         if uninstall_sw:
298
299 0             self.log.debug("Uninstalling tiller from cluster {}".format(cluster_uuid))
300
301             # find namespace for tiller pod
302 0             command = "{} --kubeconfig={} get deployments --all-namespaces".format(
303                 self.kubectl_command, config_filename
304             )
305 0             output, _rc = await self._local_async_exec(
306                 command=command, raise_exception_on_error=False
307             )
308 0             output_table = K8sHelmConnector._output_to_table(output=output)
309 0             namespace = None
310 0             for r in output_table:
311 0                 try:
312 0                     if "tiller-deploy" in r[1]:
313 0                         namespace = r[0]
314 0                         break
315 0                 except Exception:
316 0                     pass
317             else:
318 0                 msg = "Tiller deployment not found in cluster {}".format(cluster_uuid)
319 0                 self.log.error(msg)
320
321 0             self.log.debug("namespace for tiller: {}".format(namespace))
322
323 0             force_str = "--force"
324
325 0             if namespace:
326                 # delete tiller deployment
327 0                 self.log.debug(
328                     "Deleting tiller deployment for cluster {}, namespace {}".format(
329                         cluster_uuid, namespace
330                     )
331                 )
332 0                 command = (
333                     "{} --namespace {} --kubeconfig={} {} delete deployment "
334                     "tiller-deploy"
335                 ).format(self.kubectl_command, namespace, config_filename, force_str)
336 0                 await self._local_async_exec(
337                     command=command, raise_exception_on_error=False
338                 )
339
340                 # uninstall tiller from cluster
341 0                 self.log.debug(
342                     "Uninstalling tiller from cluster {}".format(cluster_uuid)
343                 )
344 0                 command = "{} --kubeconfig={} --home={} reset".format(
345                     self._helm_command, config_filename, helm_dir
346                 )
347 0                 self.log.debug("resetting: {}".format(command))
348 0                 output, _rc = await self._local_async_exec(
349                     command=command, raise_exception_on_error=True
350                 )
351             else:
352 0                 self.log.debug("namespace not found")
353
354         # delete cluster directory
355 0         direct = self.fs.path + "/" + cluster_uuid
356 0         self.log.debug("Removing directory {}".format(direct))
357 0         shutil.rmtree(direct, ignore_errors=True)
358
359 0         return True
360
361 0     async def install(
362         self,
363         cluster_uuid: str,
364         kdu_model: str,
365         atomic: bool = True,
366         timeout: float = 300,
367         params: dict = None,
368         db_dict: dict = None,
369         kdu_name: str = None,
370         namespace: str = None,
371     ):
372
373 0         self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
374
375         # config filename
376 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
377             cluster_name=cluster_uuid, create_if_not_exist=True
378         )
379
380         # params to str
381         # params_str = K8sHelmConnector._params_to_set_option(params)
382 0         params_str, file_to_delete = self._params_to_file_option(
383             cluster_uuid=cluster_uuid, params=params
384         )
385
386 0         timeout_str = ""
387 0         if timeout:
388 0             timeout_str = "--timeout {}".format(timeout)
389
390         # atomic
391 0         atomic_str = ""
392 0         if atomic:
393 0             atomic_str = "--atomic"
394         # namespace
395 0         namespace_str = ""
396 0         if namespace:
397 0             namespace_str = "--namespace {}".format(namespace)
398
399         # version
400 0         version_str = ""
401 0         if ":" in kdu_model:
402 0             parts = kdu_model.split(sep=":")
403 0             if len(parts) == 2:
404 0                 version_str = "--version {}".format(parts[1])
405 0                 kdu_model = parts[0]
406
407         # generate a name for the release. Then, check if already exists
408 0         kdu_instance = None
409 0         while kdu_instance is None:
410 0             kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
411 0             try:
412 0                 result = await self._status_kdu(
413                     cluster_uuid=cluster_uuid,
414                     kdu_instance=kdu_instance,
415                     show_error_log=False,
416                 )
417 0                 if result is not None:
418                     # instance already exists: generate a new one
419 0                     kdu_instance = None
420 0             except K8sException:
421 0                 pass
422
423         # helm repo install
424 0         command = (
425             "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
426             "{params} {timeout} --name={name} {ns} {model} {ver}".format(
427                 helm=self._helm_command,
428                 atomic=atomic_str,
429                 config=config_filename,
430                 dir=helm_dir,
431                 params=params_str,
432                 timeout=timeout_str,
433                 name=kdu_instance,
434                 ns=namespace_str,
435                 model=kdu_model,
436                 ver=version_str,
437             )
438         )
439 0         self.log.debug("installing: {}".format(command))
440
441 0         if atomic:
442             # exec helm in a task
443 0             exec_task = asyncio.ensure_future(
444                 coro_or_future=self._local_async_exec(
445                     command=command, raise_exception_on_error=False
446                 )
447             )
448
449             # write status in another task
450 0             status_task = asyncio.ensure_future(
451                 coro_or_future=self._store_status(
452                     cluster_uuid=cluster_uuid,
453                     kdu_instance=kdu_instance,
454                     db_dict=db_dict,
455                     operation="install",
456                     run_once=False,
457                 )
458             )
459
460             # wait for execution task
461 0             await asyncio.wait([exec_task])
462
463             # cancel status task
464 0             status_task.cancel()
465
466 0             output, rc = exec_task.result()
467
468         else:
469
470 0             output, rc = await self._local_async_exec(
471                 command=command, raise_exception_on_error=False
472             )
473
474         # remove temporal values yaml file
475 0         if file_to_delete:
476 0             os.remove(file_to_delete)
477
478         # write final status
479 0         await self._store_status(
480             cluster_uuid=cluster_uuid,
481             kdu_instance=kdu_instance,
482             db_dict=db_dict,
483             operation="install",
484             run_once=True,
485             check_every=0,
486         )
487
488 0         if rc != 0:
489 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
490 0             self.log.error(msg)
491 0             raise K8sException(msg)
492
493 0         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
494 0         return kdu_instance
495
496 0     async def instances_list(self, cluster_uuid: str) -> list:
497         """
498         returns a list of deployed releases in a cluster
499
500         :param cluster_uuid: the cluster
501         :return:
502         """
503
504 0         self.log.debug("list releases for cluster {}".format(cluster_uuid))
505
506         # config filename
507 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
508             cluster_name=cluster_uuid, create_if_not_exist=True
509         )
510
511 0         command = "{} --kubeconfig={} --home={} list --output yaml".format(
512             self._helm_command, config_filename, helm_dir
513         )
514
515 0         output, _rc = await self._local_async_exec(
516             command=command, raise_exception_on_error=True
517         )
518
519 0         if output and len(output) > 0:
520 0             return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
521         else:
522 0             return []
523
524 0     async def upgrade(
525         self,
526         cluster_uuid: str,
527         kdu_instance: str,
528         kdu_model: str = None,
529         atomic: bool = True,
530         timeout: float = 300,
531         params: dict = None,
532         db_dict: dict = None,
533     ):
534
535 0         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
536
537         # config filename
538 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
539             cluster_name=cluster_uuid, create_if_not_exist=True
540         )
541
542         # params to str
543         # params_str = K8sHelmConnector._params_to_set_option(params)
544 0         params_str, file_to_delete = self._params_to_file_option(
545             cluster_uuid=cluster_uuid, params=params
546         )
547
548 0         timeout_str = ""
549 0         if timeout:
550 0             timeout_str = "--timeout {}".format(timeout)
551
552         # atomic
553 0         atomic_str = ""
554 0         if atomic:
555 0             atomic_str = "--atomic"
556
557         # version
558 0         version_str = ""
559 0         if kdu_model and ":" in kdu_model:
560 0             parts = kdu_model.split(sep=":")
561 0             if len(parts) == 2:
562 0                 version_str = "--version {}".format(parts[1])
563 0                 kdu_model = parts[0]
564
565         # helm repo upgrade
566 0         command = (
567             "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
568         ).format(
569             self._helm_command,
570             atomic_str,
571             config_filename,
572             helm_dir,
573             params_str,
574             timeout_str,
575             kdu_instance,
576             kdu_model,
577             version_str,
578         )
579 0         self.log.debug("upgrading: {}".format(command))
580
581 0         if atomic:
582
583             # exec helm in a task
584 0             exec_task = asyncio.ensure_future(
585                 coro_or_future=self._local_async_exec(
586                     command=command, raise_exception_on_error=False
587                 )
588             )
589             # write status in another task
590 0             status_task = asyncio.ensure_future(
591                 coro_or_future=self._store_status(
592                     cluster_uuid=cluster_uuid,
593                     kdu_instance=kdu_instance,
594                     db_dict=db_dict,
595                     operation="upgrade",
596                     run_once=False,
597                 )
598             )
599
600             # wait for execution task
601 0             await asyncio.wait([exec_task])
602
603             # cancel status task
604 0             status_task.cancel()
605 0             output, rc = exec_task.result()
606
607         else:
608
609 0             output, rc = await self._local_async_exec(
610                 command=command, raise_exception_on_error=False
611             )
612
613         # remove temporal values yaml file
614 0         if file_to_delete:
615 0             os.remove(file_to_delete)
616
617         # write final status
618 0         await self._store_status(
619             cluster_uuid=cluster_uuid,
620             kdu_instance=kdu_instance,
621             db_dict=db_dict,
622             operation="upgrade",
623             run_once=True,
624             check_every=0,
625         )
626
627 0         if rc != 0:
628 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
629 0             self.log.error(msg)
630 0             raise K8sException(msg)
631
632         # return new revision number
633 0         instance = await self.get_instance_info(
634             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
635         )
636 0         if instance:
637 0             revision = int(instance.get("Revision"))
638 0             self.log.debug("New revision: {}".format(revision))
639 0             return revision
640         else:
641 0             return 0
642
643 0     async def rollback(
644         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
645     ):
646
647 0         self.log.debug(
648             "rollback kdu_instance {} to revision {} from cluster {}".format(
649                 kdu_instance, revision, cluster_uuid
650             )
651         )
652
653         # config filename
654 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
655             cluster_name=cluster_uuid, create_if_not_exist=True
656         )
657
658 0         command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
659             self._helm_command, config_filename, helm_dir, kdu_instance, revision
660         )
661
662         # exec helm in a task
663 0         exec_task = asyncio.ensure_future(
664             coro_or_future=self._local_async_exec(
665                 command=command, raise_exception_on_error=False
666             )
667         )
668         # write status in another task
669 0         status_task = asyncio.ensure_future(
670             coro_or_future=self._store_status(
671                 cluster_uuid=cluster_uuid,
672                 kdu_instance=kdu_instance,
673                 db_dict=db_dict,
674                 operation="rollback",
675                 run_once=False,
676             )
677         )
678
679         # wait for execution task
680 0         await asyncio.wait([exec_task])
681
682         # cancel status task
683 0         status_task.cancel()
684
685 0         output, rc = exec_task.result()
686
687         # write final status
688 0         await self._store_status(
689             cluster_uuid=cluster_uuid,
690             kdu_instance=kdu_instance,
691             db_dict=db_dict,
692             operation="rollback",
693             run_once=True,
694             check_every=0,
695         )
696
697 0         if rc != 0:
698 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
699 0             self.log.error(msg)
700 0             raise K8sException(msg)
701
702         # return new revision number
703 0         instance = await self.get_instance_info(
704             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
705         )
706 0         if instance:
707 0             revision = int(instance.get("Revision"))
708 0             self.log.debug("New revision: {}".format(revision))
709 0             return revision
710         else:
711 0             return 0
712
713 0     async def uninstall(self, cluster_uuid: str, kdu_instance: str):
714         """
715         Removes an existing KDU instance. It would implicitly use the `delete` call
716         (this call would happen after all _terminate-config-primitive_ of the VNF
717         are invoked).
718
719         :param cluster_uuid: UUID of a K8s cluster known by OSM
720         :param kdu_instance: unique name for the KDU instance to be deleted
721         :return: True if successful
722         """
723
724 0         self.log.debug(
725             "uninstall kdu_instance {} from cluster {}".format(
726                 kdu_instance, cluster_uuid
727             )
728         )
729
730         # config filename
731 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
732             cluster_name=cluster_uuid, create_if_not_exist=True
733         )
734
735 0         command = "{} --kubeconfig={} --home={} delete --purge {}".format(
736             self._helm_command, config_filename, helm_dir, kdu_instance
737         )
738
739 0         output, _rc = await self._local_async_exec(
740             command=command, raise_exception_on_error=True
741         )
742
743 0         return self._output_to_table(output)
744
745 0     async def exec_primitive(
746         self,
747         cluster_uuid: str = None,
748         kdu_instance: str = None,
749         primitive_name: str = None,
750         timeout: float = 300,
751         params: dict = None,
752         db_dict: dict = None,
753     ) -> str:
754         """Exec primitive (Juju action)
755
756         :param cluster_uuid str: The UUID of the cluster
757         :param kdu_instance str: The unique name of the KDU instance
758         :param primitive_name: Name of action that will be executed
759         :param timeout: Timeout for action execution
760         :param params: Dictionary of all the parameters needed for the action
761         :db_dict: Dictionary for any additional data
762
763         :return: Returns the output of the action
764         """
765 0         raise K8sException(
766             "KDUs deployed with Helm don't support actions "
767             "different from rollback, upgrade and status"
768         )
769
770 0     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
771
772 0         self.log.debug(
773             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
774         )
775
776 0         return await self._exec_inspect_comand(
777             inspect_command="", kdu_model=kdu_model, repo_url=repo_url
778         )
779
780 0     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
781
782 0         self.log.debug(
783             "inspect kdu_model values {} from (optional) repo: {}".format(
784                 kdu_model, repo_url
785             )
786         )
787
788 0         return await self._exec_inspect_comand(
789             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
790         )
791
792 0     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 0         self.log.debug(
795             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
796         )
797
798 0         return await self._exec_inspect_comand(
799             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
800         )
801
802 0     async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
803
804         # call internal function
805 0         return await self._status_kdu(
806             cluster_uuid=cluster_uuid,
807             kdu_instance=kdu_instance,
808             show_error_log=True,
809             return_text=True,
810         )
811
812 0     async def synchronize_repos(self, cluster_uuid: str):
813
814 0         self.log.debug("syncronize repos for cluster helm-id: {}",)
815 0         try:
816 0             update_repos_timeout = (
817                 300  # max timeout to sync a single repos, more than this is too much
818             )
819 0             db_k8scluster = self.db.get_one(
820                 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
821             )
822 0             if db_k8scluster:
823 0                 nbi_repo_list = (
824                     db_k8scluster.get("_admin").get("helm_chart_repos") or []
825                 )
826 0                 cluster_repo_dict = (
827                     db_k8scluster.get("_admin").get("helm_charts_added") or {}
828                 )
829                 # elements that must be deleted
830 0                 deleted_repo_list = []
831 0                 added_repo_dict = {}
832 0                 self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
833 0                 self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
834
835                 # obtain repos to add: registered by nbi but not added
836 0                 repos_to_add = [
837                     repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
838                 ]
839
840                 # obtain repos to delete: added by cluster but not in nbi list
841 0                 repos_to_delete = [
842                     repo
843                     for repo in cluster_repo_dict.keys()
844                     if repo not in nbi_repo_list
845                 ]
846
847                 # delete repos: must delete first then add because there may be
848                 # different repos with same name but
849                 # different id and url
850 0                 self.log.debug("repos to delete: {}".format(repos_to_delete))
851 0                 for repo_id in repos_to_delete:
852                     # try to delete repos
853 0                     try:
854 0                         repo_delete_task = asyncio.ensure_future(
855                             self.repo_remove(
856                                 cluster_uuid=cluster_uuid,
857                                 name=cluster_repo_dict[repo_id],
858                             )
859                         )
860 0                         await asyncio.wait_for(repo_delete_task, update_repos_timeout)
861 0                     except Exception as e:
862 0                         self.warning(
863                             "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
864                                 repo_id, cluster_repo_dict[repo_id], str(e)
865                             )
866                         )
867                     # always add to the list of to_delete if there is an error
868                     # because if is not there
869                     # deleting raises error
870 0                     deleted_repo_list.append(repo_id)
871
872                 # add repos
873 0                 self.log.debug("repos to add: {}".format(repos_to_add))
874 0                 for repo_id in repos_to_add:
875                     # obtain the repo data from the db
876                     # if there is an error getting the repo in the database we will
877                     # ignore this repo and continue
878                     # because there is a possible race condition where the repo has
879                     # been deleted while processing
880 0                     db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
881 0                     self.log.debug(
882                         "obtained repo: id, {}, name: {}, url: {}".format(
883                             repo_id, db_repo["name"], db_repo["url"]
884                         )
885                     )
886 0                     try:
887 0                         repo_add_task = asyncio.ensure_future(
888                             self.repo_add(
889                                 cluster_uuid=cluster_uuid,
890                                 name=db_repo["name"],
891                                 url=db_repo["url"],
892                                 repo_type="chart",
893                             )
894                         )
895 0                         await asyncio.wait_for(repo_add_task, update_repos_timeout)
896 0                         added_repo_dict[repo_id] = db_repo["name"]
897 0                         self.log.debug(
898                             "added repo: id, {}, name: {}".format(
899                                 repo_id, db_repo["name"]
900                             )
901                         )
902 0                     except Exception as e:
903                         # deal with error adding repo, adding a repo that already
904                         # exists does not raise any error
905                         # will not raise error because a wrong repos added by
906                         # anyone could prevent instantiating any ns
907 0                         self.log.error(
908                             "Error adding repo id: {}, err_msg: {} ".format(
909                                 repo_id, repr(e)
910                             )
911                         )
912
913 0                 return deleted_repo_list, added_repo_dict
914
915             else:  # else db_k8scluster does not exist
916 0                 raise K8sException(
917                     "k8cluster with helm-id : {} not found".format(cluster_uuid)
918                 )
919
920 0         except Exception as e:
921 0             self.log.error("Error synchronizing repos: {}".format(str(e)))
922 0             raise K8sException("Error synchronizing repos")
923
924     """
925     ####################################################################################
926     ################################### P R I V A T E ##################################
927     ####################################################################################
928     """
929
930 0     async def _exec_inspect_comand(
931         self, inspect_command: str, kdu_model: str, repo_url: str = None
932     ):
933
934 0         repo_str = ""
935 0         if repo_url:
936 0             repo_str = " --repo {}".format(repo_url)
937 0             idx = kdu_model.find("/")
938 0             if idx >= 0:
939 0                 idx += 1
940 0                 kdu_model = kdu_model[idx:]
941
942 0         inspect_command = "{} inspect {} {}{}".format(
943             self._helm_command, inspect_command, kdu_model, repo_str
944         )
945 0         output, _rc = await self._local_async_exec(
946             command=inspect_command, encode_utf8=True
947         )
948
949 0         return output
950
951 0     async def _status_kdu(
952         self,
953         cluster_uuid: str,
954         kdu_instance: str,
955         show_error_log: bool = False,
956         return_text: bool = False,
957     ):
958
959 0         self.log.debug("status of kdu_instance {}".format(kdu_instance))
960
961         # config filename
962 0         _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
963             cluster_name=cluster_uuid, create_if_not_exist=True
964         )
965
966 0         command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
967             self._helm_command, config_filename, helm_dir, kdu_instance
968         )
969
970 0         output, rc = await self._local_async_exec(
971             command=command,
972             raise_exception_on_error=True,
973             show_error_log=show_error_log,
974         )
975
976 0         if return_text:
977 0             return str(output)
978
979 0         if rc != 0:
980 0             return None
981
982 0         data = yaml.load(output, Loader=yaml.SafeLoader)
983
984         # remove field 'notes'
985 0         try:
986 0             del data.get("info").get("status")["notes"]
987 0         except KeyError:
988 0             pass
989
990         # parse field 'resources'
991 0         try:
992 0             resources = str(data.get("info").get("status").get("resources"))
993 0             resource_table = self._output_to_table(resources)
994 0             data.get("info").get("status")["resources"] = resource_table
995 0         except Exception:
996 0             pass
997
998 0         return data
999
1000 0     async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1001 0         instances = await self.instances_list(cluster_uuid=cluster_uuid)
1002 0         for instance in instances:
1003 0             if instance.get("Name") == kdu_instance:
1004 0                 return instance
1005 0         self.log.debug("Instance {} not found".format(kdu_instance))
1006 0         return None
1007
1008 0     @staticmethod
1009 0     def _generate_release_name(chart_name: str):
1010         # check embeded chart (file or dir)
1011 0         if chart_name.startswith("/"):
1012             # extract file or directory name
1013 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
1014         # check URL
1015 0         elif "://" in chart_name:
1016             # extract last portion of URL
1017 0             chart_name = chart_name[chart_name.rfind("/") + 1 :]
1018
1019 0         name = ""
1020 0         for c in chart_name:
1021 0             if c.isalpha() or c.isnumeric():
1022 0                 name += c
1023             else:
1024 0                 name += "-"
1025 0         if len(name) > 35:
1026 0             name = name[0:35]
1027
1028         # if does not start with alpha character, prefix 'a'
1029 0         if not name[0].isalpha():
1030 0             name = "a" + name
1031
1032 0         name += "-"
1033
1034 0         def get_random_number():
1035 0             r = random.randrange(start=1, stop=99999999)
1036 0             s = str(r)
1037 0             s = s.rjust(10, "0")
1038 0             return s
1039
1040 0         name = name + get_random_number()
1041 0         return name.lower()
1042
1043 0     async def _store_status(
1044         self,
1045         cluster_uuid: str,
1046         operation: str,
1047         kdu_instance: str,
1048         check_every: float = 10,
1049         db_dict: dict = None,
1050         run_once: bool = False,
1051     ):
1052 0         while True:
1053 0             try:
1054 0                 await asyncio.sleep(check_every)
1055 0                 detailed_status = await self.status_kdu(
1056                     cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
1057                 )
1058 0                 status = detailed_status.get("info").get("Description")
1059 0                 self.log.debug("STATUS:\n{}".format(status))
1060 0                 self.log.debug("DETAILED STATUS:\n{}".format(detailed_status))
1061                 # write status to db
1062 0                 result = await self.write_app_status_to_db(
1063                     db_dict=db_dict,
1064                     status=str(status),
1065                     detailed_status=str(detailed_status),
1066                     operation=operation,
1067                 )
1068 0                 if not result:
1069 0                     self.log.info("Error writing in database. Task exiting...")
1070 0                     return
1071 0             except asyncio.CancelledError:
1072 0                 self.log.debug("Task cancelled")
1073 0                 return
1074 0             except Exception as e:
1075 0                 self.log.debug("_store_status exception: {}".format(str(e)))
1076 0                 pass
1077             finally:
1078 0                 if run_once:
1079 0                     return
1080
1081 0     async def _is_install_completed(self, cluster_uuid: str, kdu_instance: str) -> bool:
1082
1083 0         status = await self._status_kdu(
1084             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False
1085         )
1086
1087         # extract info.status.resources-> str
1088         # format:
1089         #       ==> v1/Deployment
1090         #       NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
1091         #       halting-horse-mongodb   0/1     1            0           0s
1092         #       halting-petit-mongodb   1/1     1            0           0s
1093         # blank line
1094 0         resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1095
1096         # convert to table
1097 0         resources = K8sHelmConnector._output_to_table(resources)
1098
1099 0         num_lines = len(resources)
1100 0         index = 0
1101 0         while index < num_lines:
1102 0             try:
1103 0                 line1 = resources[index]
1104 0                 index += 1
1105                 # find '==>' in column 0
1106 0                 if line1[0] == "==>":
1107 0                     line2 = resources[index]
1108 0                     index += 1
1109                     # find READY in column 1
1110 0                     if line2[1] == "READY":
1111                         # read next lines
1112 0                         line3 = resources[index]
1113 0                         index += 1
1114 0                         while len(line3) > 1 and index < num_lines:
1115 0                             ready_value = line3[1]
1116 0                             parts = ready_value.split(sep="/")
1117 0                             current = int(parts[0])
1118 0                             total = int(parts[1])
1119 0                             if current < total:
1120 0                                 self.log.debug("NOT READY:\n    {}".format(line3))
1121 0                                 ready = False
1122 0                             line3 = resources[index]
1123 0                             index += 1
1124
1125 0             except Exception:
1126 0                 pass
1127
1128 0         return ready
1129
1130 0     @staticmethod
1131 0     def _get_deep(dictionary: dict, members: tuple):
1132 0         target = dictionary
1133 0         value = None
1134 0         try:
1135 0             for m in members:
1136 0                 value = target.get(m)
1137 0                 if not value:
1138 0                     return None
1139                 else:
1140 0                     target = value
1141 0         except Exception:
1142 0             pass
1143 0         return value
1144
1145     # find key:value in several lines
1146 0     @staticmethod
1147 0     def _find_in_lines(p_lines: list, p_key: str) -> str:
1148 0         for line in p_lines:
1149 0             try:
1150 0                 if line.startswith(p_key + ":"):
1151 0                     parts = line.split(":")
1152 0                     the_value = parts[1].strip()
1153 0                     return the_value
1154 0             except Exception:
1155                 # ignore it
1156 0                 pass
1157 0         return None
1158
1159     # params for use in -f file
1160     # returns values file option and filename (in order to delete it at the end)
1161 0     def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
1162
1163 0         if params and len(params) > 0:
1164 0             self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
1165
1166 0             def get_random_number():
1167 0                 r = random.randrange(start=1, stop=99999999)
1168 0                 s = str(r)
1169 0                 while len(s) < 10:
1170 0                     s = "0" + s
1171 0                 return s
1172
1173 0             params2 = dict()
1174 0             for key in params:
1175 0                 value = params.get(key)
1176 0                 if "!!yaml" in str(value):
1177 0                     value = yaml.load(value[7:])
1178 0                 params2[key] = value
1179
1180 0             values_file = get_random_number() + ".yaml"
1181 0             with open(values_file, "w") as stream:
1182 0                 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1183
1184 0             return "-f {}".format(values_file), values_file
1185
1186 0         return "", None
1187
1188     # params for use in --set option
1189 0     @staticmethod
1190 0     def _params_to_set_option(params: dict) -> str:
1191 0         params_str = ""
1192 0         if params and len(params) > 0:
1193 0             start = True
1194 0             for key in params:
1195 0                 value = params.get(key, None)
1196 0                 if value is not None:
1197 0                     if start:
1198 0                         params_str += "--set "
1199 0                         start = False
1200                     else:
1201 0                         params_str += ","
1202 0                     params_str += "{}={}".format(key, value)
1203 0         return params_str
1204
1205 0     @staticmethod
1206 0     def _output_to_lines(output: str) -> list:
1207 0         output_lines = list()
1208 0         lines = output.splitlines(keepends=False)
1209 0         for line in lines:
1210 0             line = line.strip()
1211 0             if len(line) > 0:
1212 0                 output_lines.append(line)
1213 0         return output_lines
1214
1215 0     @staticmethod
1216 0     def _output_to_table(output: str) -> list:
1217 0         output_table = list()
1218 0         lines = output.splitlines(keepends=False)
1219 0         for line in lines:
1220 0             line = line.replace("\t", " ")
1221 0             line_list = list()
1222 0             output_table.append(line_list)
1223 0             cells = line.split(sep=" ")
1224 0             for cell in cells:
1225 0                 cell = cell.strip()
1226 0                 if len(cell) > 0:
1227 0                     line_list.append(cell)
1228 0         return output_table
1229
1230 0     def _get_paths(
1231         self, cluster_name: str, create_if_not_exist: bool = False
1232     ) -> (str, str, str, str):
1233         """
1234         Returns kube and helm directories
1235
1236         :param cluster_name:
1237         :param create_if_not_exist:
1238         :return: kube, helm directories, config filename and cluster dir.
1239                 Raises exception if not exist and cannot create
1240         """
1241
1242 0         base = self.fs.path
1243 0         if base.endswith("/") or base.endswith("\\"):
1244 0             base = base[:-1]
1245
1246         # base dir for cluster
1247 0         cluster_dir = base + "/" + cluster_name
1248 0         if create_if_not_exist and not os.path.exists(cluster_dir):
1249 0             self.log.debug("Creating dir {}".format(cluster_dir))
1250 0             os.makedirs(cluster_dir)
1251 0         if not os.path.exists(cluster_dir):
1252 0             msg = "Base cluster dir {} does not exist".format(cluster_dir)
1253 0             self.log.error(msg)
1254 0             raise K8sException(msg)
1255
1256         # kube dir
1257 0         kube_dir = cluster_dir + "/" + ".kube"
1258 0         if create_if_not_exist and not os.path.exists(kube_dir):
1259 0             self.log.debug("Creating dir {}".format(kube_dir))
1260 0             os.makedirs(kube_dir)
1261 0         if not os.path.exists(kube_dir):
1262 0             msg = "Kube config dir {} does not exist".format(kube_dir)
1263 0             self.log.error(msg)
1264 0             raise K8sException(msg)
1265
1266         # helm home dir
1267 0         helm_dir = cluster_dir + "/" + ".helm"
1268 0         if create_if_not_exist and not os.path.exists(helm_dir):
1269 0             self.log.debug("Creating dir {}".format(helm_dir))
1270 0             os.makedirs(helm_dir)
1271 0         if not os.path.exists(helm_dir):
1272 0             msg = "Helm config dir {} does not exist".format(helm_dir)
1273 0             self.log.error(msg)
1274 0             raise K8sException(msg)
1275
1276 0         config_filename = kube_dir + "/config"
1277 0         return kube_dir, helm_dir, config_filename, cluster_dir
1278
1279 0     @staticmethod
1280     def _remove_multiple_spaces(strobj):
1281 0         strobj = strobj.strip()
1282 0         while "  " in strobj:
1283 0             strobj = strobj.replace("  ", " ")
1284 0         return strobj
1285
1286 0     def _local_exec(self, command: str) -> (str, int):
1287 0         command = K8sHelmConnector._remove_multiple_spaces(command)
1288 0         self.log.debug("Executing sync local command: {}".format(command))
1289         # raise exception if fails
1290 0         output = ""
1291 0         try:
1292 0             output = subprocess.check_output(
1293                 command, shell=True, universal_newlines=True
1294             )
1295 0             return_code = 0
1296 0             self.log.debug(output)
1297 0         except Exception:
1298 0             return_code = 1
1299
1300 0         return output, return_code
1301
1302 0     async def _local_async_exec(
1303         self,
1304         command: str,
1305         raise_exception_on_error: bool = False,
1306         show_error_log: bool = True,
1307         encode_utf8: bool = False,
1308     ) -> (str, int):
1309
1310 0         command = K8sHelmConnector._remove_multiple_spaces(command)
1311 0         self.log.debug("Executing async local command: {}".format(command))
1312
1313         # split command
1314 0         command = command.split(sep=" ")
1315
1316 0         try:
1317 0             process = await asyncio.create_subprocess_exec(
1318                 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1319             )
1320
1321             # wait for command terminate
1322 0             stdout, stderr = await process.communicate()
1323
1324 0             return_code = process.returncode
1325
1326 0             output = ""
1327 0             if stdout:
1328 0                 output = stdout.decode("utf-8").strip()
1329                 # output = stdout.decode()
1330 0             if stderr:
1331 0                 output = stderr.decode("utf-8").strip()
1332                 # output = stderr.decode()
1333
1334 0             if return_code != 0 and show_error_log:
1335 0                 self.log.debug(
1336                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1337                 )
1338             else:
1339 0                 self.log.debug("Return code: {}".format(return_code))
1340
1341 0             if raise_exception_on_error and return_code != 0:
1342 0                 raise K8sException(output)
1343
1344 0             if encode_utf8:
1345 0                 output = output.encode("utf-8").strip()
1346 0                 output = str(output).replace("\\n", "\n")
1347
1348 0             return output, return_code
1349
1350 0         except asyncio.CancelledError:
1351 0             raise
1352 0         except K8sException:
1353 0             raise
1354 0         except Exception as e:
1355 0             msg = "Exception executing command: {} -> {}".format(command, e)
1356 0             self.log.error(msg)
1357 0             if raise_exception_on_error:
1358 0                 raise K8sException(e) from e
1359             else:
1360 0                 return "", -1
1361
1362 0     def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1363         # self.log.debug('Checking if file {} exists...'.format(filename))
1364 0         if os.path.exists(filename):
1365 0             return True
1366         else:
1367 0             msg = "File {} does not exist".format(filename)
1368 0             if exception_if_not_exists:
1369                 # self.log.error(msg)
1370 0                 raise K8sException(msg)