Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_base_conn.py

Trend

Classes100%
 
Lines57%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_helm_base_conn.py
100%
1/1
57%
339/595
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_base_conn.py
57%
339/595
N/A

Source

n2vc/k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import abc
23 1 import asyncio
24 1 import random
25 1 import time
26 1 import shlex
27 1 import shutil
28 1 import stat
29 1 import subprocess
30 1 import os
31 1 import yaml
32 1 from uuid import uuid4
33
34 1 from n2vc.exceptions import K8sException
35 1 from n2vc.k8s_conn import K8sConnector
36
37
38 1 class K8sHelmBaseConnector(K8sConnector):
39
40     """
41     ####################################################################################
42     ################################### P U B L I C ####################################
43     ####################################################################################
44     """
45 1     service_account = "osm"
46 1     _STABLE_REPO_URL = "https://charts.helm.sh/stable"
47
48 1     def __init__(
49         self,
50         fs: object,
51         db: object,
52         kubectl_command: str = "/usr/bin/kubectl",
53         helm_command: str = "/usr/bin/helm",
54         log: object = None,
55         on_update_db=None,
56         vca_config: dict = None,
57     ):
58         """
59
60         :param fs: file system for kubernetes and helm configuration
61         :param db: database object to write current operation status
62         :param kubectl_command: path to kubectl executable
63         :param helm_command: path to helm executable
64         :param log: logger
65         :param on_update_db: callback called when k8s connector updates database
66         """
67
68         # parent class
69 1         K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 1         self.log.info("Initializing K8S Helm connector")
72
73         # random numbers for release name generation
74 1         random.seed(time.time())
75
76         # the file system
77 1         self.fs = fs
78
79         # exception if kubectl is not installed
80 1         self.kubectl_command = kubectl_command
81 1         self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83         # exception if helm is not installed
84 1         self._helm_command = helm_command
85 1         self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87         # obtain stable repo url from config or apply default
88 1         if not vca_config or not vca_config.get("stablerepourl"):
89 1             self._stable_repo_url = self._STABLE_REPO_URL
90         else:
91 0             self._stable_repo_url = vca_config.get("stablerepourl")
92
93 1     @staticmethod
94 1     def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95         """
96         Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97         cluster_id for backward compatibility
98         """
99 1         namespace, _, cluster_id = cluster_uuid.rpartition(':')
100 1         return namespace, cluster_id
101
102 1     async def init_env(
103             self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
104     ) -> (str, bool):
105         """
106         It prepares a given K8s cluster environment to run Charts
107
108         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109             '.kube/config'
110         :param namespace: optional namespace to be used for helm. By default,
111             'kube-system' will be used
112         :param reuse_cluster_uuid: existing cluster uuid for reuse
113         :return: uuid of the K8s cluster and True if connector has installed some
114             software in the cluster
115         (on error, an exception will be raised)
116         """
117
118 1         if reuse_cluster_uuid:
119 1             namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
120 1             namespace = namespace_ or namespace
121         else:
122 0             cluster_id = str(uuid4())
123 1         cluster_uuid = "{}:{}".format(namespace, cluster_id)
124
125 1         self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
126
127 1         paths, env = self._init_paths_env(
128             cluster_name=cluster_id, create_if_not_exist=True
129         )
130 1         mode = stat.S_IRUSR | stat.S_IWUSR
131 1         with open(paths["kube_config"], "w", mode) as f:
132 1             f.write(k8s_creds)
133 1         os.chmod(paths["kube_config"], 0o600)
134
135         # Code with initialization specific of helm version
136 1         n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
137
138         # sync fs with local data
139 1         self.fs.reverse_sync(from_path=cluster_id)
140
141 1         self.log.info("Cluster {} initialized".format(cluster_id))
142
143 1         return cluster_uuid, n2vc_installed_sw
144
145 1     async def repo_add(
146             self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
147     ):
148 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
149 1         self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
150             cluster_id, repo_type, name, url))
151
152         # sync local dir
153 1         self.fs.sync(from_path=cluster_id)
154
155         # init_env
156 1         paths, env = self._init_paths_env(
157             cluster_name=cluster_id, create_if_not_exist=True
158         )
159
160         # helm repo update
161 1         command = "{} repo update".format(
162             self._helm_command
163         )
164 1         self.log.debug("updating repo: {}".format(command))
165 1         await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
166
167         # helm repo add name url
168 1         command = "{} repo add {} {}".format(
169             self._helm_command, name, url
170         )
171 1         self.log.debug("adding repo: {}".format(command))
172 1         await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
173
174         # sync fs
175 1         self.fs.reverse_sync(from_path=cluster_id)
176
177 1     async def repo_list(self, cluster_uuid: str) -> list:
178         """
179         Get the list of registered repositories
180
181         :return: list of registered repositories: [ (name, url) .... ]
182         """
183
184 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
185 1         self.log.debug("list repositories for cluster {}".format(cluster_id))
186
187         # sync local dir
188 1         self.fs.sync(from_path=cluster_id)
189
190         # config filename
191 1         paths, env = self._init_paths_env(
192             cluster_name=cluster_id, create_if_not_exist=True
193         )
194
195 1         command = "{} repo list --output yaml".format(
196             self._helm_command
197         )
198
199         # Set exception to false because if there are no repos just want an empty list
200 1         output, _rc = await self._local_async_exec(
201             command=command, raise_exception_on_error=False, env=env
202         )
203
204         # sync fs
205 1         self.fs.reverse_sync(from_path=cluster_id)
206
207 1         if _rc == 0:
208 1             if output and len(output) > 0:
209 0                 repos = yaml.load(output, Loader=yaml.SafeLoader)
210                 # unify format between helm2 and helm3 setting all keys lowercase
211 0                 return self._lower_keys_list(repos)
212             else:
213 1                 return []
214         else:
215 0             return []
216
217 1     async def repo_remove(self, cluster_uuid: str, name: str):
218
219 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
220 1         self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
221
222         # sync local dir
223 1         self.fs.sync(from_path=cluster_id)
224
225         # init env, paths
226 1         paths, env = self._init_paths_env(
227             cluster_name=cluster_id, create_if_not_exist=True
228         )
229
230 1         command = "{} repo remove {}".format(
231             self._helm_command, name
232         )
233 1         await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
234
235         # sync fs
236 1         self.fs.reverse_sync(from_path=cluster_id)
237
238 1     async def reset(
239             self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
240     ) -> bool:
241
242 1         namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
243 1         self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
244                        .format(cluster_id, uninstall_sw))
245
246         # sync local dir
247 1         self.fs.sync(from_path=cluster_id)
248
249         # uninstall releases if needed.
250 1         if uninstall_sw:
251 1             releases = await self.instances_list(cluster_uuid=cluster_uuid)
252 1             if len(releases) > 0:
253 1                 if force:
254 1                     for r in releases:
255 1                         try:
256 1                             kdu_instance = r.get("name")
257 1                             chart = r.get("chart")
258 1                             self.log.debug(
259                                 "Uninstalling {} -> {}".format(chart, kdu_instance)
260                             )
261 1                             await self.uninstall(
262                                 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
263                             )
264 0                         except Exception as e:
265                             # will not raise exception as it was found
266                             # that in some cases of previously installed helm releases it
267                             # raised an error
268 0                             self.log.warn(
269                                 "Error uninstalling release {}: {}".format(kdu_instance, e)
270                             )
271                 else:
272 0                     msg = (
273                         "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
274                     ).format(cluster_id)
275 0                     self.log.warn(msg)
276 0                     uninstall_sw = False  # Allow to remove k8s cluster without removing Tiller
277
278 1         if uninstall_sw:
279 1             await self._uninstall_sw(cluster_id, namespace)
280
281         # delete cluster directory
282 1         self.log.debug("Removing directory {}".format(cluster_id))
283 1         self.fs.file_delete(cluster_id, ignore_non_exist=True)
284         # Remove also local directorio if still exist
285 1         direct = self.fs.path + "/" + cluster_id
286 1         shutil.rmtree(direct, ignore_errors=True)
287
288 1         return True
289
290 1     async def _install_impl(
291             self,
292             cluster_id: str,
293             kdu_model: str,
294             paths: dict,
295             env: dict,
296             kdu_instance: str,
297             atomic: bool = True,
298             timeout: float = 300,
299             params: dict = None,
300             db_dict: dict = None,
301             kdu_name: str = None,
302             namespace: str = None,
303     ):
304         # params to str
305 1         params_str, file_to_delete = self._params_to_file_option(
306             cluster_id=cluster_id, params=params
307         )
308
309         # version
310 1         version = None
311 1         if ":" in kdu_model:
312 1             parts = kdu_model.split(sep=":")
313 1             if len(parts) == 2:
314 1                 version = str(parts[1])
315 1                 kdu_model = parts[0]
316
317 1         command = self._get_install_command(kdu_model, kdu_instance, namespace,
318                                             params_str, version, atomic, timeout)
319
320 1         self.log.debug("installing: {}".format(command))
321
322 1         if atomic:
323             # exec helm in a task
324 1             exec_task = asyncio.ensure_future(
325                 coro_or_future=self._local_async_exec(
326                     command=command, raise_exception_on_error=False, env=env
327                 )
328             )
329
330             # write status in another task
331 1             status_task = asyncio.ensure_future(
332                 coro_or_future=self._store_status(
333                     cluster_id=cluster_id,
334                     kdu_instance=kdu_instance,
335                     namespace=namespace,
336                     db_dict=db_dict,
337                     operation="install",
338                     run_once=False,
339                 )
340             )
341
342             # wait for execution task
343 1             await asyncio.wait([exec_task])
344
345             # cancel status task
346 1             status_task.cancel()
347
348 1             output, rc = exec_task.result()
349
350         else:
351
352 0             output, rc = await self._local_async_exec(
353                 command=command, raise_exception_on_error=False, env=env
354             )
355
356         # remove temporal values yaml file
357 1         if file_to_delete:
358 0             os.remove(file_to_delete)
359
360         # write final status
361 1         await self._store_status(
362             cluster_id=cluster_id,
363             kdu_instance=kdu_instance,
364             namespace=namespace,
365             db_dict=db_dict,
366             operation="install",
367             run_once=True,
368             check_every=0,
369         )
370
371 1         if rc != 0:
372 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
373 0             self.log.error(msg)
374 0             raise K8sException(msg)
375
376 1     async def upgrade(
377         self,
378         cluster_uuid: str,
379         kdu_instance: str,
380         kdu_model: str = None,
381         atomic: bool = True,
382         timeout: float = 300,
383         params: dict = None,
384         db_dict: dict = None,
385     ):
386 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
387 1         self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
388
389         # sync local dir
390 1         self.fs.sync(from_path=cluster_id)
391
392         # look for instance to obtain namespace
393 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
394 1         if not instance_info:
395 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
396
397         # init env, paths
398 1         paths, env = self._init_paths_env(
399             cluster_name=cluster_id, create_if_not_exist=True
400         )
401
402         # params to str
403 1         params_str, file_to_delete = self._params_to_file_option(
404             cluster_id=cluster_id, params=params
405         )
406
407         # version
408 1         version = None
409 1         if ":" in kdu_model:
410 1             parts = kdu_model.split(sep=":")
411 1             if len(parts) == 2:
412 1                 version = str(parts[1])
413 1                 kdu_model = parts[0]
414
415 1         command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
416                                             params_str, version, atomic, timeout)
417
418 1         self.log.debug("upgrading: {}".format(command))
419
420 1         if atomic:
421
422             # exec helm in a task
423 1             exec_task = asyncio.ensure_future(
424                 coro_or_future=self._local_async_exec(
425                     command=command, raise_exception_on_error=False, env=env
426                 )
427             )
428             # write status in another task
429 1             status_task = asyncio.ensure_future(
430                 coro_or_future=self._store_status(
431                     cluster_id=cluster_id,
432                     kdu_instance=kdu_instance,
433                     namespace=instance_info["namespace"],
434                     db_dict=db_dict,
435                     operation="upgrade",
436                     run_once=False,
437                 )
438             )
439
440             # wait for execution task
441 1             await asyncio.wait([exec_task])
442
443             # cancel status task
444 1             status_task.cancel()
445 1             output, rc = exec_task.result()
446
447         else:
448
449 0             output, rc = await self._local_async_exec(
450                 command=command, raise_exception_on_error=False, env=env
451             )
452
453         # remove temporal values yaml file
454 1         if file_to_delete:
455 0             os.remove(file_to_delete)
456
457         # write final status
458 1         await self._store_status(
459             cluster_id=cluster_id,
460             kdu_instance=kdu_instance,
461             namespace=instance_info["namespace"],
462             db_dict=db_dict,
463             operation="upgrade",
464             run_once=True,
465             check_every=0,
466         )
467
468 1         if rc != 0:
469 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
470 0             self.log.error(msg)
471 0             raise K8sException(msg)
472
473         # sync fs
474 1         self.fs.reverse_sync(from_path=cluster_id)
475
476         # return new revision number
477 1         instance = await self.get_instance_info(
478             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
479         )
480 1         if instance:
481 1             revision = int(instance.get("revision"))
482 1             self.log.debug("New revision: {}".format(revision))
483 1             return revision
484         else:
485 0             return 0
486
487 1     async def rollback(
488         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
489     ):
490
491 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
492 1         self.log.debug(
493             "rollback kdu_instance {} to revision {} from cluster {}".format(
494                 kdu_instance, revision, cluster_id
495             )
496         )
497
498         # sync local dir
499 1         self.fs.sync(from_path=cluster_id)
500
501         # look for instance to obtain namespace
502 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
503 1         if not instance_info:
504 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
505
506         # init env, paths
507 1         paths, env = self._init_paths_env(
508             cluster_name=cluster_id, create_if_not_exist=True
509         )
510
511 1         command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
512                                              revision)
513
514 1         self.log.debug("rolling_back: {}".format(command))
515
516         # exec helm in a task
517 1         exec_task = asyncio.ensure_future(
518             coro_or_future=self._local_async_exec(
519                 command=command, raise_exception_on_error=False, env=env
520             )
521         )
522         # write status in another task
523 1         status_task = asyncio.ensure_future(
524             coro_or_future=self._store_status(
525                 cluster_id=cluster_id,
526                 kdu_instance=kdu_instance,
527                 namespace=instance_info["namespace"],
528                 db_dict=db_dict,
529                 operation="rollback",
530                 run_once=False,
531             )
532         )
533
534         # wait for execution task
535 1         await asyncio.wait([exec_task])
536
537         # cancel status task
538 1         status_task.cancel()
539
540 1         output, rc = exec_task.result()
541
542         # write final status
543 1         await self._store_status(
544             cluster_id=cluster_id,
545             kdu_instance=kdu_instance,
546             namespace=instance_info["namespace"],
547             db_dict=db_dict,
548             operation="rollback",
549             run_once=True,
550             check_every=0,
551         )
552
553 1         if rc != 0:
554 0             msg = "Error executing command: {}\nOutput: {}".format(command, output)
555 0             self.log.error(msg)
556 0             raise K8sException(msg)
557
558         # sync fs
559 1         self.fs.reverse_sync(from_path=cluster_id)
560
561         # return new revision number
562 1         instance = await self.get_instance_info(
563             cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
564         )
565 1         if instance:
566 1             revision = int(instance.get("revision"))
567 1             self.log.debug("New revision: {}".format(revision))
568 1             return revision
569         else:
570 0             return 0
571
572 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str):
573         """
574         Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
575         (this call should happen after all _terminate-config-primitive_ of the VNF
576         are invoked).
577
578         :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
579         :param kdu_instance: unique name for the KDU instance to be deleted
580         :return: True if successful
581         """
582
583 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
584 1         self.log.debug(
585             "uninstall kdu_instance {} from cluster {}".format(
586                 kdu_instance, cluster_id
587             )
588         )
589
590         # sync local dir
591 1         self.fs.sync(from_path=cluster_id)
592
593         # look for instance to obtain namespace
594 1         instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
595 1         if not instance_info:
596 0             raise K8sException("kdu_instance {} not found".format(kdu_instance))
597
598         # init env, paths
599 1         paths, env = self._init_paths_env(
600             cluster_name=cluster_id, create_if_not_exist=True
601         )
602
603 1         command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
604 1         output, _rc = await self._local_async_exec(
605             command=command, raise_exception_on_error=True, env=env
606         )
607
608         # sync fs
609 1         self.fs.reverse_sync(from_path=cluster_id)
610
611 1         return self._output_to_table(output)
612
613 1     async def instances_list(self, cluster_uuid: str) -> list:
614         """
615         returns a list of deployed releases in a cluster
616
617         :param cluster_uuid: the 'cluster' or 'namespace:cluster'
618         :return:
619         """
620
621 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
622 1         self.log.debug("list releases for cluster {}".format(cluster_id))
623
624         # sync local dir
625 1         self.fs.sync(from_path=cluster_id)
626
627         # execute internal command
628 1         result = await self._instances_list(cluster_id)
629
630         # sync fs
631 1         self.fs.reverse_sync(from_path=cluster_id)
632
633 1         return result
634
635 1     async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
636 0         instances = await self.instances_list(cluster_uuid=cluster_uuid)
637 0         for instance in instances:
638 0             if instance.get("name") == kdu_instance:
639 0                 return instance
640 0         self.log.debug("Instance {} not found".format(kdu_instance))
641 0         return None
642
643 1     async def exec_primitive(
644         self,
645         cluster_uuid: str = None,
646         kdu_instance: str = None,
647         primitive_name: str = None,
648         timeout: float = 300,
649         params: dict = None,
650         db_dict: dict = None,
651     ) -> str:
652         """Exec primitive (Juju action)
653
654         :param cluster_uuid: The UUID of the cluster or namespace:cluster
655         :param kdu_instance: The unique name of the KDU instance
656         :param primitive_name: Name of action that will be executed
657         :param timeout: Timeout for action execution
658         :param params: Dictionary of all the parameters needed for the action
659         :db_dict: Dictionary for any additional data
660
661         :return: Returns the output of the action
662         """
663 0         raise K8sException(
664             "KDUs deployed with Helm don't support actions "
665             "different from rollback, upgrade and status"
666         )
667
668 1     async def get_services(self,
669                            cluster_uuid: str,
670                            kdu_instance: str,
671                            namespace: str) -> list:
672         """
673         Returns a list of services defined for the specified kdu instance.
674
675         :param cluster_uuid: UUID of a K8s cluster known by OSM
676         :param kdu_instance: unique name for the KDU instance
677         :param namespace: K8s namespace used by the KDU instance
678         :return: If successful, it will return a list of services, Each service
679         can have the following data:
680         - `name` of the service
681         - `type` type of service in the k8 cluster
682         - `ports` List of ports offered by the service, for each port includes at least
683         name, port, protocol
684         - `cluster_ip` Internal ip to be used inside k8s cluster
685         - `external_ip` List of external ips (in case they are available)
686         """
687
688 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
689 1         self.log.debug(
690             "get_services: cluster_uuid: {}, kdu_instance: {}".format(
691                 cluster_uuid, kdu_instance
692             )
693         )
694
695         # sync local dir
696 1         self.fs.sync(from_path=cluster_id)
697
698         # get list of services names for kdu
699 1         service_names = await self._get_services(cluster_id, kdu_instance, namespace)
700
701 1         service_list = []
702 1         for service in service_names:
703 1             service = await self._get_service(cluster_id, service, namespace)
704 1             service_list.append(service)
705
706         # sync fs
707 1         self.fs.reverse_sync(from_path=cluster_id)
708
709 1         return service_list
710
711 1     async def get_service(self,
712                           cluster_uuid: str,
713                           service_name: str,
714                           namespace: str) -> object:
715
716 1         self.log.debug(
717             "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
718                 service_name, namespace, cluster_uuid)
719         )
720
721 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
722
723         # sync local dir
724 1         self.fs.sync(from_path=cluster_id)
725
726 1         service = await self._get_service(cluster_id, service_name, namespace)
727
728         # sync fs
729 1         self.fs.reverse_sync(from_path=cluster_id)
730
731 1         return service
732
733 1     async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
734
735 0         self.log.debug(
736             "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
737                 cluster_uuid, kdu_instance
738             )
739         )
740
741 0         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
742
743         # sync local dir
744 0         self.fs.sync(from_path=cluster_id)
745
746         # get instance: needed to obtain namespace
747 0         instances = await self._instances_list(cluster_id=cluster_id)
748 0         for instance in instances:
749 0             if instance.get("name") == kdu_instance:
750 0                 break
751         else:
752             # instance does not exist
753 0             raise K8sException("Instance name: {} not found in cluster: {}".format(
754                 kdu_instance, cluster_id))
755
756 0         status = await self._status_kdu(
757             cluster_id=cluster_id,
758             kdu_instance=kdu_instance,
759             namespace=instance["namespace"],
760             show_error_log=True,
761             return_text=True,
762         )
763
764         # sync fs
765 0         self.fs.reverse_sync(from_path=cluster_id)
766
767 0         return status
768
769 1     async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
770
771 1         self.log.debug(
772             "inspect kdu_model values {} from (optional) repo: {}".format(
773                 kdu_model, repo_url
774             )
775         )
776
777 1         return await self._exec_inspect_comand(
778             inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
779         )
780
781 1     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
782
783 1         self.log.debug(
784             "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
785         )
786
787 1         return await self._exec_inspect_comand(
788             inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
789         )
790
791 1     async def synchronize_repos(self, cluster_uuid: str):
792
793 1         self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
794 1         try:
795 1             db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
796 1             db_repo_dict = self._get_db_repos_dict(db_repo_ids)
797
798 1             local_repo_list = await self.repo_list(cluster_uuid)
799 1             local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
800
801 1             deleted_repo_list = []
802 1             added_repo_dict = {}
803
804             # iterate over the list of repos in the database that should be
805             # added if not present
806 1             for repo_name, db_repo in db_repo_dict.items():
807 1                 try:
808                     # check if it is already present
809 1                     curr_repo_url = local_repo_dict.get(db_repo["name"])
810 1                     repo_id = db_repo.get("_id")
811 1                     if curr_repo_url != db_repo["url"]:
812 1                         if curr_repo_url:
813 0                             self.log.debug("repo {} url changed, delete and and again".format(
814                                 db_repo["url"]))
815 0                             await self.repo_remove(cluster_uuid, db_repo["name"])
816 0                             deleted_repo_list.append(repo_id)
817
818                         # add repo
819 1                         self.log.debug("add repo {}".format(db_repo["name"]))
820 1                         await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
821 1                         added_repo_dict[repo_id] = db_repo["name"]
822 0                 except Exception as e:
823 0                     raise K8sException(
824                         "Error adding repo id: {}, err_msg: {} ".format(
825                             repo_id, repr(e)
826                         )
827                     )
828
829             # Delete repos that are present but not in nbi_list
830 1             for repo_name in local_repo_dict:
831 1                 if not db_repo_dict.get(repo_name) and repo_name != "stable":
832 1                     self.log.debug("delete repo {}".format(repo_name))
833 1                     try:
834 1                         await self.repo_remove(cluster_uuid, repo_name)
835 1                         deleted_repo_list.append(repo_name)
836 0                     except Exception as e:
837 0                         self.warning(
838                             "Error deleting repo, name: {}, err_msg: {}".format(
839                                 repo_name, str(e)
840                             )
841                         )
842
843 1             return deleted_repo_list, added_repo_dict
844
845 0         except K8sException:
846 0             raise
847 0         except Exception as e:
848             # Do not raise errors synchronizing repos
849 0             self.log.error("Error synchronizing repos: {}".format(e))
850 0             raise Exception("Error synchronizing repos: {}".format(e))
851
852 1     def _get_db_repos_dict(self, repo_ids: list):
853 1         db_repos_dict = {}
854 1         for repo_id in repo_ids:
855 1             db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
856 1             db_repos_dict[db_repo["name"]] = db_repo
857 1         return db_repos_dict
858
859     """
860     ####################################################################################
861     ################################### TO BE IMPLEMENTED SUBCLASSES ###################
862     ####################################################################################
863     """
864
865 1     @abc.abstractmethod
866 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
867         """
868         Creates and returns base cluster and kube dirs and returns them.
869         Also created helm3 dirs according to new directory specification, paths are
870         not returned but assigned to helm environment variables
871
872         :param cluster_name:  cluster_name
873         :return: Dictionary with config_paths and dictionary with helm environment variables
874         """
875
876 1     @abc.abstractmethod
877     async def _cluster_init(self, cluster_id, namespace, paths, env):
878         """
879         Implements the helm version dependent cluster initialization
880         """
881
882 1     @abc.abstractmethod
883     async def _instances_list(self, cluster_id):
884         """
885         Implements the helm version dependent helm instances list
886         """
887
888 1     @abc.abstractmethod
889     async def _get_services(self, cluster_id, kdu_instance, namespace):
890         """
891         Implements the helm version dependent method to obtain services from a helm instance
892         """
893
894 1     @abc.abstractmethod
895 1     async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
896                           show_error_log: bool = False, return_text: bool = False):
897         """
898         Implements the helm version dependent method to obtain status of a helm instance
899         """
900
901 1     @abc.abstractmethod
902 1     def _get_install_command(self, kdu_model, kdu_instance, namespace,
903                              params_str, version, atomic, timeout) -> str:
904         """
905         Obtain command to be executed to delete the indicated instance
906         """
907
908 1     @abc.abstractmethod
909 1     def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
910                              params_str, version, atomic, timeout) -> str:
911         """
912         Obtain command to be executed to upgrade the indicated instance
913         """
914
915 1     @abc.abstractmethod
916 1     def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
917         """
918         Obtain command to be executed to rollback the indicated instance
919         """
920
921 1     @abc.abstractmethod
922 1     def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
923         """
924         Obtain command to be executed to delete the indicated instance
925         """
926
927 1     @abc.abstractmethod
928 1     def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
929                              version: str):
930         """
931         Obtain command to be executed to obtain information about the kdu
932         """
933
934 1     @abc.abstractmethod
935 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
936         """
937         Method call to uninstall cluster software for helm. This method is dependent
938         of helm version
939         For Helm v2 it will be called when Tiller must be uninstalled
940         For Helm v3 it does nothing and does not need to be callled
941         """
942
943 1     @abc.abstractmethod
944 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
945         """
946         Obtains the cluster repos identifiers
947         """
948
949     """
950     ####################################################################################
951     ################################### P R I V A T E ##################################
952     ####################################################################################
953     """
954
955 1     @staticmethod
956 1     def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
957 0         if os.path.exists(filename):
958 0             return True
959         else:
960 0             msg = "File {} does not exist".format(filename)
961 0             if exception_if_not_exists:
962 0                 raise K8sException(msg)
963
964 1     @staticmethod
965     def _remove_multiple_spaces(strobj):
966 0         strobj = strobj.strip()
967 0         while "  " in strobj:
968 0             strobj = strobj.replace("  ", " ")
969 0         return strobj
970
971 1     @staticmethod
972 1     def _output_to_lines(output: str) -> list:
973 0         output_lines = list()
974 0         lines = output.splitlines(keepends=False)
975 0         for line in lines:
976 0             line = line.strip()
977 0             if len(line) > 0:
978 0                 output_lines.append(line)
979 0         return output_lines
980
981 1     @staticmethod
982 1     def _output_to_table(output: str) -> list:
983 1         output_table = list()
984 1         lines = output.splitlines(keepends=False)
985 1         for line in lines:
986 0             line = line.replace("\t", " ")
987 0             line_list = list()
988 0             output_table.append(line_list)
989 0             cells = line.split(sep=" ")
990 0             for cell in cells:
991 0                 cell = cell.strip()
992 0                 if len(cell) > 0:
993 0                     line_list.append(cell)
994 1         return output_table
995
996 1     @staticmethod
997 1     def _parse_services(output: str) -> list:
998 0         lines = output.splitlines(keepends=False)
999 0         services = []
1000 0         for line in lines:
1001 0             line = line.replace("\t", " ")
1002 0             cells = line.split(sep=" ")
1003 0             if len(cells) > 0 and cells[0].startswith("service/"):
1004 0                 elems = cells[0].split(sep="/")
1005 0                 if len(elems) > 1:
1006 0                     services.append(elems[1])
1007 0         return services
1008
1009 1     @staticmethod
1010 1     def _get_deep(dictionary: dict, members: tuple):
1011 1         target = dictionary
1012 1         value = None
1013 1         try:
1014 1             for m in members:
1015 1                 value = target.get(m)
1016 0                 if not value:
1017 0                     return None
1018                 else:
1019 0                     target = value
1020 1         except Exception:
1021 1             pass
1022 1         return value
1023
1024     # find key:value in several lines
1025 1     @staticmethod
1026 1     def _find_in_lines(p_lines: list, p_key: str) -> str:
1027 0         for line in p_lines:
1028 0             try:
1029 0                 if line.startswith(p_key + ":"):
1030 0                     parts = line.split(":")
1031 0                     the_value = parts[1].strip()
1032 0                     return the_value
1033 0             except Exception:
1034                 # ignore it
1035 0                 pass
1036 0         return None
1037
1038 1     @staticmethod
1039 1     def _lower_keys_list(input_list: list):
1040         """
1041         Transform the keys in a list of dictionaries to lower case and returns a new list
1042         of dictionaries
1043         """
1044 0         new_list = []
1045 0         for dictionary in input_list:
1046 0             new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1047 0             new_list.append(new_dict)
1048 0         return new_list
1049
1050 1     def _local_exec(self, command: str) -> (str, int):
1051 0         command = self._remove_multiple_spaces(command)
1052 0         self.log.debug("Executing sync local command: {}".format(command))
1053         # raise exception if fails
1054 0         output = ""
1055 0         try:
1056 0             output = subprocess.check_output(
1057                 command, shell=True, universal_newlines=True
1058             )
1059 0             return_code = 0
1060 0             self.log.debug(output)
1061 0         except Exception:
1062 0             return_code = 1
1063
1064 0         return output, return_code
1065
1066 1     async def _local_async_exec(
1067         self,
1068         command: str,
1069         raise_exception_on_error: bool = False,
1070         show_error_log: bool = True,
1071         encode_utf8: bool = False,
1072         env: dict = None
1073     ) -> (str, int):
1074
1075 0         command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1076 0         self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1077
1078         # split command
1079 0         command = shlex.split(command)
1080
1081 0         environ = os.environ.copy()
1082 0         if env:
1083 0             environ.update(env)
1084
1085 0         try:
1086 0             process = await asyncio.create_subprocess_exec(
1087                 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1088                 env=environ
1089             )
1090
1091             # wait for command terminate
1092 0             stdout, stderr = await process.communicate()
1093
1094 0             return_code = process.returncode
1095
1096 0             output = ""
1097 0             if stdout:
1098 0                 output = stdout.decode("utf-8").strip()
1099                 # output = stdout.decode()
1100 0             if stderr:
1101 0                 output = stderr.decode("utf-8").strip()
1102                 # output = stderr.decode()
1103
1104 0             if return_code != 0 and show_error_log:
1105 0                 self.log.debug(
1106                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1107                 )
1108             else:
1109 0                 self.log.debug("Return code: {}".format(return_code))
1110
1111 0             if raise_exception_on_error and return_code != 0:
1112 0                 raise K8sException(output)
1113
1114 0             if encode_utf8:
1115 0                 output = output.encode("utf-8").strip()
1116 0                 output = str(output).replace("\\n", "\n")
1117
1118 0             return output, return_code
1119
1120 0         except asyncio.CancelledError:
1121 0             raise
1122 0         except K8sException:
1123 0             raise
1124 0         except Exception as e:
1125 0             msg = "Exception executing command: {} -> {}".format(command, e)
1126 0             self.log.error(msg)
1127 0             if raise_exception_on_error:
1128 0                 raise K8sException(e) from e
1129             else:
1130 0                 return "", -1
1131
1132 1     async def _local_async_exec_pipe(self,
1133                                      command1: str,
1134                                      command2: str,
1135                                      raise_exception_on_error: bool = True,
1136                                      show_error_log: bool = True,
1137                                      encode_utf8: bool = False,
1138                                      env: dict = None):
1139
1140 0         command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1141 0         command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1142 0         command = "{} | {}".format(command1, command2)
1143 0         self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1144
1145         # split command
1146 0         command1 = shlex.split(command1)
1147 0         command2 = shlex.split(command2)
1148
1149 0         environ = os.environ.copy()
1150 0         if env:
1151 0             environ.update(env)
1152
1153 0         try:
1154 0             read, write = os.pipe()
1155 0             await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1156 0             os.close(write)
1157 0             process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1158                                                              stdout=asyncio.subprocess.PIPE,
1159                                                              env=environ)
1160 0             os.close(read)
1161 0             stdout, stderr = await process_2.communicate()
1162
1163 0             return_code = process_2.returncode
1164
1165 0             output = ""
1166 0             if stdout:
1167 0                 output = stdout.decode("utf-8").strip()
1168                 # output = stdout.decode()
1169 0             if stderr:
1170 0                 output = stderr.decode("utf-8").strip()
1171                 # output = stderr.decode()
1172
1173 0             if return_code != 0 and show_error_log:
1174 0                 self.log.debug(
1175                     "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1176                 )
1177             else:
1178 0                 self.log.debug("Return code: {}".format(return_code))
1179
1180 0             if raise_exception_on_error and return_code != 0:
1181 0                 raise K8sException(output)
1182
1183 0             if encode_utf8:
1184 0                 output = output.encode("utf-8").strip()
1185 0                 output = str(output).replace("\\n", "\n")
1186
1187 0             return output, return_code
1188 0         except asyncio.CancelledError:
1189 0             raise
1190 0         except K8sException:
1191 0             raise
1192 0         except Exception as e:
1193 0             msg = "Exception executing command: {} -> {}".format(command, e)
1194 0             self.log.error(msg)
1195 0             if raise_exception_on_error:
1196 0                 raise K8sException(e) from e
1197             else:
1198 0                 return "", -1
1199
1200 1     async def _get_service(self, cluster_id, service_name, namespace):
1201         """
1202         Obtains the data of the specified service in the k8cluster.
1203
1204         :param cluster_id: id of a K8s cluster known by OSM
1205         :param service_name: name of the K8s service in the specified namespace
1206         :param namespace: K8s namespace used by the KDU instance
1207         :return: If successful, it will return a service with the following data:
1208         - `name` of the service
1209         - `type` type of service in the k8 cluster
1210         - `ports` List of ports offered by the service, for each port includes at least
1211         name, port, protocol
1212         - `cluster_ip` Internal ip to be used inside k8s cluster
1213         - `external_ip` List of external ips (in case they are available)
1214         """
1215
1216         # init config, env
1217 1         paths, env = self._init_paths_env(
1218             cluster_name=cluster_id, create_if_not_exist=True
1219         )
1220
1221 1         command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1222             self.kubectl_command, paths["kube_config"], namespace, service_name
1223         )
1224
1225 1         output, _rc = await self._local_async_exec(
1226             command=command, raise_exception_on_error=True, env=env
1227         )
1228
1229 1         data = yaml.load(output, Loader=yaml.SafeLoader)
1230
1231 1         service = {
1232             "name": service_name,
1233             "type": self._get_deep(data, ("spec", "type")),
1234             "ports": self._get_deep(data, ("spec", "ports")),
1235             "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1236         }
1237 1         if service["type"] == "LoadBalancer":
1238 0             ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1239 0             ip_list = [elem["ip"] for elem in ip_map_list]
1240 0             service["external_ip"] = ip_list
1241
1242 1         return service
1243
1244 1     async def _exec_inspect_comand(
1245         self, inspect_command: str, kdu_model: str, repo_url: str = None
1246     ):
1247         """
1248         Obtains information about a kdu, no cluster (no env)
1249         """
1250
1251 1         repo_str = ""
1252 1         if repo_url:
1253 1             repo_str = " --repo {}".format(repo_url)
1254
1255 1         idx = kdu_model.find("/")
1256 1         if idx >= 0:
1257 1             idx += 1
1258 1             kdu_model = kdu_model[idx:]
1259
1260 1         version = ""
1261 1         if ":" in kdu_model:
1262 1             parts = kdu_model.split(sep=":")
1263 1             if len(parts) == 2:
1264 1                 version = "--version {}".format(str(parts[1]))
1265 1                 kdu_model = parts[0]
1266
1267 1         full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1268 1         output, _rc = await self._local_async_exec(
1269             command=full_command, encode_utf8=True
1270         )
1271
1272 1         return output
1273
1274 1     async def _store_status(
1275         self,
1276         cluster_id: str,
1277         operation: str,
1278         kdu_instance: str,
1279         namespace: str = None,
1280         check_every: float = 10,
1281         db_dict: dict = None,
1282         run_once: bool = False,
1283     ):
1284 1         while True:
1285 1             try:
1286 1                 await asyncio.sleep(check_every)
1287 1                 detailed_status = await self._status_kdu(
1288                     cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1289                     return_text=False
1290                 )
1291 1                 status = detailed_status.get("info").get("description")
1292 1                 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1293                 # write status to db
1294 1                 result = await self.write_app_status_to_db(
1295                     db_dict=db_dict,
1296                     status=str(status),
1297                     detailed_status=str(detailed_status),
1298                     operation=operation,
1299                 )
1300 1                 if not result:
1301 0                     self.log.info("Error writing in database. Task exiting...")
1302 0                     return
1303 0             except asyncio.CancelledError:
1304 0                 self.log.debug("Task cancelled")
1305 0                 return
1306 0             except Exception as e:
1307 0                 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1308 0                 pass
1309             finally:
1310 1                 if run_once:
1311 1                     return
1312
1313     # params for use in -f file
1314     # returns values file option and filename (in order to delete it at the end)
1315 1     def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1316
1317 1         if params and len(params) > 0:
1318 0             self._init_paths_env(
1319                 cluster_name=cluster_id, create_if_not_exist=True
1320             )
1321
1322 0             def get_random_number():
1323 0                 r = random.randrange(start=1, stop=99999999)
1324 0                 s = str(r)
1325 0                 while len(s) < 10:
1326 0                     s = "0" + s
1327 0                 return s
1328
1329 0             params2 = dict()
1330 0             for key in params:
1331 0                 value = params.get(key)
1332 0                 if "!!yaml" in str(value):
1333 0                     value = yaml.load(value[7:])
1334 0                 params2[key] = value
1335
1336 0             values_file = get_random_number() + ".yaml"
1337 0             with open(values_file, "w") as stream:
1338 0                 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1339
1340 0             return "-f {}".format(values_file), values_file
1341
1342 1         return "", None
1343
1344     # params for use in --set option
1345 1     @staticmethod
1346 1     def _params_to_set_option(params: dict) -> str:
1347 0         params_str = ""
1348 0         if params and len(params) > 0:
1349 0             start = True
1350 0             for key in params:
1351 0                 value = params.get(key, None)
1352 0                 if value is not None:
1353 0                     if start:
1354 0                         params_str += "--set "
1355 0                         start = False
1356                     else:
1357 0                         params_str += ","
1358 0                     params_str += "{}={}".format(key, value)
1359 0         return params_str
1360
1361 1     @staticmethod
1362     def generate_kdu_instance_name(**kwargs):
1363 0         chart_name = kwargs["kdu_model"]
1364         # check embeded chart (file or dir)
1365 0         if chart_name.startswith("/"):
1366             # extract file or directory name
1367 0             chart_name = chart_name[chart_name.rfind("/") + 1:]
1368         # check URL
1369 0         elif "://" in chart_name:
1370             # extract last portion of URL
1371 0             chart_name = chart_name[chart_name.rfind("/") + 1:]
1372
1373 0         name = ""
1374 0         for c in chart_name:
1375 0             if c.isalpha() or c.isnumeric():
1376 0                 name += c
1377             else:
1378 0                 name += "-"
1379 0         if len(name) > 35:
1380 0             name = name[0:35]
1381
1382         # if does not start with alpha character, prefix 'a'
1383 0         if not name[0].isalpha():
1384 0             name = "a" + name
1385
1386 0         name += "-"
1387
1388 0         def get_random_number():
1389 0             r = random.randrange(start=1, stop=99999999)
1390 0             s = str(r)
1391 0             s = s.rjust(10, "0")
1392 0             return s
1393
1394 0         name = name + get_random_number()
1395 0         return name.lower()