blob: 149b064ba5ed57fb5fbccac5ae7705eba074d07f [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleirad901a802022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garciadd322062021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garciadd322062021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garciadd322062021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
David Garcia2a10e432022-06-17 14:27:54 +020093 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
lloretgalleg1c83f2e2020-10-22 09:12:35 +000095 """
David Garcia2a10e432022-06-17 14:27:54 +020096 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
David Garcia2a10e432022-06-17 14:27:54 +020099 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101
102 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200117 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
David Garcia2a10e432022-06-17 14:27:54 +0200124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000126 else:
127 cluster_id = str(uuid4())
David Garcia2a10e432022-06-17 14:27:54 +0200128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000129
garciadeblas82b591c2021-03-24 09:22:13 +0100130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
David Garcia2a10e432022-06-17 14:27:54 +0200150 return cluster_uuid, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000151
152 async def repo_add(
garciadeblas82b591c2021-03-24 09:22:13 +0100153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000154 ):
David Garcia2a10e432022-06-17 14:27:54 +0200155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
David Garcia2a10e432022-06-17 14:27:54 +0200158 cluster_id, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100159 )
160 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000161
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000162 # init_env
163 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200164 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000165 )
166
David Garcia05bccf72022-02-02 11:35:20 +0100167 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200168 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100169
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000170 # helm repo add name url
David Garcia05bccf72022-02-02 11:35:20 +0100171 command = "env KUBECONFIG={} {} repo add {} {}".format(
172 paths["kube_config"], self._helm_command, name, url
173 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000174 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100175 await self._local_async_exec(
176 command=command, raise_exception_on_error=True, env=env
177 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000178
garciadeblas13cbdb72022-05-04 10:57:36 +0200179 # helm repo update
garciadeblasff9a4ce2022-05-04 11:07:41 +0200180 command = "env KUBECONFIG={} {} repo update {}".format(
181 paths["kube_config"], self._helm_command, name
garciadeblas13cbdb72022-05-04 10:57:36 +0200182 )
183 self.log.debug("updating repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=False, env=env
186 )
187
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000188 # sync fs
limon96d0d3d2022-07-21 13:19:44 +0200189 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000190
limon96d0d3d2022-07-21 13:19:44 +0200191 async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"):
garciadeblasd3399352022-04-08 22:53:25 +0200192 self.log.debug(
limon96d0d3d2022-07-21 13:19:44 +0200193 "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name)
garciadeblasd3399352022-04-08 22:53:25 +0200194 )
195
196 # init_env
197 paths, env = self._init_paths_env(
limon96d0d3d2022-07-21 13:19:44 +0200198 cluster_name=cluster_id, create_if_not_exist=True
garciadeblasd3399352022-04-08 22:53:25 +0200199 )
200
201 # sync local dir
limon96d0d3d2022-07-21 13:19:44 +0200202 self.fs.sync(from_path=cluster_id)
garciadeblasd3399352022-04-08 22:53:25 +0200203
204 # helm repo update
205 command = "{} repo update {}".format(self._helm_command, name)
206 self.log.debug("updating repo: {}".format(command))
207 await self._local_async_exec(
208 command=command, raise_exception_on_error=False, env=env
209 )
210
211 # sync fs
limon96d0d3d2022-07-21 13:19:44 +0200212 self.fs.reverse_sync(from_path=cluster_id)
garciadeblasd3399352022-04-08 22:53:25 +0200213
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000214 async def repo_list(self, cluster_uuid: str) -> list:
215 """
216 Get the list of registered repositories
217
218 :return: list of registered repositories: [ (name, url) .... ]
219 """
220
David Garcia2a10e432022-06-17 14:27:54 +0200221 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
222 self.log.debug("list repositories for cluster {}".format(cluster_id))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000223
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000224 # config filename
225 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200226 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000227 )
228
David Garcia05bccf72022-02-02 11:35:20 +0100229 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200230 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100231
232 command = "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths["kube_config"], self._helm_command
234 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000235
236 # Set exception to false because if there are no repos just want an empty list
237 output, _rc = await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200242 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243
244 if _rc == 0:
245 if output and len(output) > 0:
246 repos = yaml.load(output, Loader=yaml.SafeLoader)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self._lower_keys_list(repos)
249 else:
250 return []
251 else:
252 return []
253
254 async def repo_remove(self, cluster_uuid: str, name: str):
David Garcia2a10e432022-06-17 14:27:54 +0200255 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
256 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000257
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000258 # init env, paths
259 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200260 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000261 )
262
David Garcia05bccf72022-02-02 11:35:20 +0100263 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200264 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100265
266 command = "env KUBECONFIG={} {} repo remove {}".format(
267 paths["kube_config"], self._helm_command, name
268 )
garciadeblas82b591c2021-03-24 09:22:13 +0100269 await self._local_async_exec(
270 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000271 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000272
273 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200274 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000275
276 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100277 self,
278 cluster_uuid: str,
279 force: bool = False,
280 uninstall_sw: bool = False,
281 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000282 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200283 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000284
David Garciaeb8943a2021-04-12 12:07:37 +0200285 Resets the Kubernetes cluster by removing the helm deployment that represents it.
286
287 :param cluster_uuid: The UUID of the cluster to reset
288 :param force: Boolean to force the reset
289 :param uninstall_sw: Boolean to force the reset
290 :param kwargs: Additional parameters (None yet)
291 :return: Returns True if successful or raises an exception.
292 """
David Garcia2a10e432022-06-17 14:27:54 +0200293 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100294 self.log.debug(
295 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
David Garcia2a10e432022-06-17 14:27:54 +0200296 cluster_id, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100297 )
298 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000299
300 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200301 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000302
303 # uninstall releases if needed.
304 if uninstall_sw:
305 releases = await self.instances_list(cluster_uuid=cluster_uuid)
306 if len(releases) > 0:
307 if force:
308 for r in releases:
309 try:
310 kdu_instance = r.get("name")
311 chart = r.get("chart")
312 self.log.debug(
313 "Uninstalling {} -> {}".format(chart, kdu_instance)
314 )
315 await self.uninstall(
316 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
317 )
318 except Exception as e:
319 # will not raise exception as it was found
320 # that in some cases of previously installed helm releases it
321 # raised an error
322 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100323 "Error uninstalling release {}: {}".format(
324 kdu_instance, e
325 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000326 )
327 else:
328 msg = (
329 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
David Garcia2a10e432022-06-17 14:27:54 +0200330 ).format(cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000331 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100332 uninstall_sw = (
333 False # Allow to remove k8s cluster without removing Tiller
334 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000335
336 if uninstall_sw:
David Garcia2a10e432022-06-17 14:27:54 +0200337 await self._uninstall_sw(cluster_id, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000338
339 # delete cluster directory
David Garcia2a10e432022-06-17 14:27:54 +0200340 self.log.debug("Removing directory {}".format(cluster_id))
341 self.fs.file_delete(cluster_id, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000342 # Remove also local directorio if still exist
David Garcia2a10e432022-06-17 14:27:54 +0200343 direct = self.fs.path + "/" + cluster_id
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000344 shutil.rmtree(direct, ignore_errors=True)
345
346 return True
347
lloretgalleg095392b2020-11-20 11:28:08 +0000348 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100349 self,
350 cluster_id: str,
351 kdu_model: str,
352 paths: dict,
353 env: dict,
354 kdu_instance: str,
355 atomic: bool = True,
356 timeout: float = 300,
357 params: dict = None,
358 db_dict: dict = None,
359 kdu_name: str = None,
360 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000361 ):
David Garcia05bccf72022-02-02 11:35:20 +0100362 # init env, paths
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367 # params to str
368 params_str, file_to_delete = self._params_to_file_option(
369 cluster_id=cluster_id, params=params
370 )
371
372 # version
373 version = None
374 if ":" in kdu_model:
375 parts = kdu_model.split(sep=":")
376 if len(parts) == 2:
377 version = str(parts[1])
378 kdu_model = parts[0]
379
garciadeblasd3399352022-04-08 22:53:25 +0200380 repo = self._split_repo(kdu_model)
381 if repo:
limon0fbea822022-07-21 13:55:55 +0200382 await self.repo_update(cluster_id, repo)
garciadeblasd3399352022-04-08 22:53:25 +0200383
garciadeblas82b591c2021-03-24 09:22:13 +0100384 command = self._get_install_command(
David Garcia05bccf72022-02-02 11:35:20 +0100385 kdu_model,
386 kdu_instance,
387 namespace,
388 params_str,
389 version,
390 atomic,
391 timeout,
392 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100393 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000394
395 self.log.debug("installing: {}".format(command))
396
397 if atomic:
398 # exec helm in a task
399 exec_task = asyncio.ensure_future(
400 coro_or_future=self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403 )
404
405 # write status in another task
406 status_task = asyncio.ensure_future(
407 coro_or_future=self._store_status(
408 cluster_id=cluster_id,
409 kdu_instance=kdu_instance,
410 namespace=namespace,
411 db_dict=db_dict,
412 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000413 )
414 )
415
416 # wait for execution task
417 await asyncio.wait([exec_task])
418
419 # cancel status task
420 status_task.cancel()
421
422 output, rc = exec_task.result()
423
424 else:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000425 output, rc = await self._local_async_exec(
426 command=command, raise_exception_on_error=False, env=env
427 )
428
429 # remove temporal values yaml file
430 if file_to_delete:
431 os.remove(file_to_delete)
432
433 # write final status
434 await self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000440 )
441
442 if rc != 0:
443 msg = "Error executing command: {}\nOutput: {}".format(command, output)
444 self.log.error(msg)
445 raise K8sException(msg)
446
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000447 async def upgrade(
448 self,
449 cluster_uuid: str,
450 kdu_instance: str,
451 kdu_model: str = None,
452 atomic: bool = True,
453 timeout: float = 300,
454 params: dict = None,
455 db_dict: dict = None,
456 ):
David Garcia2a10e432022-06-17 14:27:54 +0200457 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
458 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000459
460 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200461 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000462
463 # look for instance to obtain namespace
464 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
465 if not instance_info:
466 raise K8sException("kdu_instance {} not found".format(kdu_instance))
467
468 # init env, paths
469 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200470 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000471 )
472
David Garcia05bccf72022-02-02 11:35:20 +0100473 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200474 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100475
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000476 # params to str
477 params_str, file_to_delete = self._params_to_file_option(
David Garcia2a10e432022-06-17 14:27:54 +0200478 cluster_id=cluster_id, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000479 )
480
481 # version
482 version = None
483 if ":" in kdu_model:
484 parts = kdu_model.split(sep=":")
485 if len(parts) == 2:
486 version = str(parts[1])
487 kdu_model = parts[0]
488
garciadeblasd3399352022-04-08 22:53:25 +0200489 repo = self._split_repo(kdu_model)
490 if repo:
limon0fbea822022-07-21 13:55:55 +0200491 await self.repo_update(cluster_id, repo)
garciadeblasd3399352022-04-08 22:53:25 +0200492
garciadeblas82b591c2021-03-24 09:22:13 +0100493 command = self._get_upgrade_command(
494 kdu_model,
495 kdu_instance,
496 instance_info["namespace"],
497 params_str,
498 version,
499 atomic,
500 timeout,
David Garcia05bccf72022-02-02 11:35:20 +0100501 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100502 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503
504 self.log.debug("upgrading: {}".format(command))
505
506 if atomic:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000507 # exec helm in a task
508 exec_task = asyncio.ensure_future(
509 coro_or_future=self._local_async_exec(
510 command=command, raise_exception_on_error=False, env=env
511 )
512 )
513 # write status in another task
514 status_task = asyncio.ensure_future(
515 coro_or_future=self._store_status(
David Garcia2a10e432022-06-17 14:27:54 +0200516 cluster_id=cluster_id,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000517 kdu_instance=kdu_instance,
518 namespace=instance_info["namespace"],
519 db_dict=db_dict,
520 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000521 )
522 )
523
524 # wait for execution task
525 await asyncio.wait([exec_task])
526
527 # cancel status task
528 status_task.cancel()
529 output, rc = exec_task.result()
530
531 else:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000532 output, rc = await self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535
536 # remove temporal values yaml file
537 if file_to_delete:
538 os.remove(file_to_delete)
539
540 # write final status
541 await self._store_status(
David Garcia2a10e432022-06-17 14:27:54 +0200542 cluster_id=cluster_id,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000543 kdu_instance=kdu_instance,
544 namespace=instance_info["namespace"],
545 db_dict=db_dict,
546 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000547 )
548
549 if rc != 0:
550 msg = "Error executing command: {}\nOutput: {}".format(command, output)
551 self.log.error(msg)
552 raise K8sException(msg)
553
554 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200555 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000556
557 # return new revision number
558 instance = await self.get_instance_info(
559 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
560 )
561 if instance:
562 revision = int(instance.get("revision"))
563 self.log.debug("New revision: {}".format(revision))
564 return revision
565 else:
566 return 0
567
aktas2962f3e2021-03-15 11:05:35 +0300568 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100569 self,
570 kdu_instance: str,
571 scale: int,
572 resource_name: str,
573 total_timeout: float = 1800,
574 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300575 ):
576 raise NotImplementedError("Method not implemented")
577
578 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100579 self,
580 resource_name: str,
581 kdu_instance: str,
582 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300583 ):
584 raise NotImplementedError("Method not implemented")
585
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000586 async def rollback(
587 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
588 ):
David Garcia2a10e432022-06-17 14:27:54 +0200589 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000590 self.log.debug(
591 "rollback kdu_instance {} to revision {} from cluster {}".format(
David Garcia2a10e432022-06-17 14:27:54 +0200592 kdu_instance, revision, cluster_id
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000593 )
594 )
595
596 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200597 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000598
599 # look for instance to obtain namespace
600 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
601 if not instance_info:
602 raise K8sException("kdu_instance {} not found".format(kdu_instance))
603
604 # init env, paths
605 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200606 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000607 )
608
David Garcia05bccf72022-02-02 11:35:20 +0100609 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200610 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100611
garciadeblas82b591c2021-03-24 09:22:13 +0100612 command = self._get_rollback_command(
David Garcia05bccf72022-02-02 11:35:20 +0100613 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100614 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000615
616 self.log.debug("rolling_back: {}".format(command))
617
618 # exec helm in a task
619 exec_task = asyncio.ensure_future(
620 coro_or_future=self._local_async_exec(
621 command=command, raise_exception_on_error=False, env=env
622 )
623 )
624 # write status in another task
625 status_task = asyncio.ensure_future(
626 coro_or_future=self._store_status(
David Garcia2a10e432022-06-17 14:27:54 +0200627 cluster_id=cluster_id,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000628 kdu_instance=kdu_instance,
629 namespace=instance_info["namespace"],
630 db_dict=db_dict,
631 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000632 )
633 )
634
635 # wait for execution task
636 await asyncio.wait([exec_task])
637
638 # cancel status task
639 status_task.cancel()
640
641 output, rc = exec_task.result()
642
643 # write final status
644 await self._store_status(
David Garcia2a10e432022-06-17 14:27:54 +0200645 cluster_id=cluster_id,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000646 kdu_instance=kdu_instance,
647 namespace=instance_info["namespace"],
648 db_dict=db_dict,
649 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000650 )
651
652 if rc != 0:
653 msg = "Error executing command: {}\nOutput: {}".format(command, output)
654 self.log.error(msg)
655 raise K8sException(msg)
656
657 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200658 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000659
660 # return new revision number
661 instance = await self.get_instance_info(
662 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
663 )
664 if instance:
665 revision = int(instance.get("revision"))
666 self.log.debug("New revision: {}".format(revision))
667 return revision
668 else:
669 return 0
670
David Garciaeb8943a2021-04-12 12:07:37 +0200671 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000672 """
673 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
674 (this call should happen after all _terminate-config-primitive_ of the VNF
675 are invoked).
676
677 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
678 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200679 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000680 :return: True if successful
681 """
682
David Garcia2a10e432022-06-17 14:27:54 +0200683 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000684 self.log.debug(
David Garcia2a10e432022-06-17 14:27:54 +0200685 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000686 )
687
688 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200689 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000690
691 # look for instance to obtain namespace
692 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
693 if not instance_info:
David Garcia820b38a2021-08-18 14:52:52 +0200694 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
695 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000696 # init env, paths
697 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200698 cluster_name=cluster_id, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000699 )
700
David Garcia05bccf72022-02-02 11:35:20 +0100701 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200702 self.fs.sync(from_path=cluster_id)
David Garcia05bccf72022-02-02 11:35:20 +0100703
704 command = self._get_uninstall_command(
705 kdu_instance, instance_info["namespace"], paths["kube_config"]
706 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000707 output, _rc = await self._local_async_exec(
708 command=command, raise_exception_on_error=True, env=env
709 )
710
711 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200712 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000713
714 return self._output_to_table(output)
715
716 async def instances_list(self, cluster_uuid: str) -> list:
717 """
718 returns a list of deployed releases in a cluster
719
720 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
721 :return:
722 """
723
David Garcia2a10e432022-06-17 14:27:54 +0200724 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
725 self.log.debug("list releases for cluster {}".format(cluster_id))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000726
727 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200728 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000729
730 # execute internal command
David Garcia2a10e432022-06-17 14:27:54 +0200731 result = await self._instances_list(cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000732
733 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200734 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000735
736 return result
737
738 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
739 instances = await self.instances_list(cluster_uuid=cluster_uuid)
740 for instance in instances:
741 if instance.get("name") == kdu_instance:
742 return instance
743 self.log.debug("Instance {} not found".format(kdu_instance))
744 return None
745
746 async def exec_primitive(
747 self,
748 cluster_uuid: str = None,
749 kdu_instance: str = None,
750 primitive_name: str = None,
751 timeout: float = 300,
752 params: dict = None,
753 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200754 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000755 ) -> str:
756 """Exec primitive (Juju action)
757
758 :param cluster_uuid: The UUID of the cluster or namespace:cluster
759 :param kdu_instance: The unique name of the KDU instance
760 :param primitive_name: Name of action that will be executed
761 :param timeout: Timeout for action execution
762 :param params: Dictionary of all the parameters needed for the action
763 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200764 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000765
766 :return: Returns the output of the action
767 """
768 raise K8sException(
769 "KDUs deployed with Helm don't support actions "
770 "different from rollback, upgrade and status"
771 )
772
garciadeblas82b591c2021-03-24 09:22:13 +0100773 async def get_services(
774 self, cluster_uuid: str, kdu_instance: str, namespace: str
775 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000776 """
777 Returns a list of services defined for the specified kdu instance.
778
779 :param cluster_uuid: UUID of a K8s cluster known by OSM
780 :param kdu_instance: unique name for the KDU instance
781 :param namespace: K8s namespace used by the KDU instance
782 :return: If successful, it will return a list of services, Each service
783 can have the following data:
784 - `name` of the service
785 - `type` type of service in the k8 cluster
786 - `ports` List of ports offered by the service, for each port includes at least
787 name, port, protocol
788 - `cluster_ip` Internal ip to be used inside k8s cluster
789 - `external_ip` List of external ips (in case they are available)
790 """
791
David Garcia2a10e432022-06-17 14:27:54 +0200792 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000793 self.log.debug(
794 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
795 cluster_uuid, kdu_instance
796 )
797 )
798
David Garcia05bccf72022-02-02 11:35:20 +0100799 # init env, paths
800 paths, env = self._init_paths_env(
David Garcia2a10e432022-06-17 14:27:54 +0200801 cluster_name=cluster_id, create_if_not_exist=True
David Garcia05bccf72022-02-02 11:35:20 +0100802 )
803
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000804 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200805 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000806
807 # get list of services names for kdu
David Garcia05bccf72022-02-02 11:35:20 +0100808 service_names = await self._get_services(
David Garcia2a10e432022-06-17 14:27:54 +0200809 cluster_id, kdu_instance, namespace, paths["kube_config"]
David Garcia05bccf72022-02-02 11:35:20 +0100810 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000811
812 service_list = []
813 for service in service_names:
David Garcia2a10e432022-06-17 14:27:54 +0200814 service = await self._get_service(cluster_id, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000815 service_list.append(service)
816
817 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200818 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000819
820 return service_list
821
garciadeblas82b591c2021-03-24 09:22:13 +0100822 async def get_service(
823 self, cluster_uuid: str, service_name: str, namespace: str
824 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000825 self.log.debug(
826 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +0100827 service_name, namespace, cluster_uuid
828 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000829 )
830
David Garcia2a10e432022-06-17 14:27:54 +0200831 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000832
David Garcia2a10e432022-06-17 14:27:54 +0200833 # sync local dir
834 self.fs.sync(from_path=cluster_id)
835
836 service = await self._get_service(cluster_id, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000837
838 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200839 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000840
841 return service
842
Pedro Escaleirad901a802022-04-05 17:32:13 +0100843 async def status_kdu(
844 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
845 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +0200846 """
847 This call would retrieve tha current state of a given KDU instance. It would be
848 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
849 values_ of the configuration parameters applied to a given instance. This call
850 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000851
David Garciaeb8943a2021-04-12 12:07:37 +0200852 :param cluster_uuid: UUID of a K8s cluster known by OSM
853 :param kdu_instance: unique name for the KDU instance
854 :param kwargs: Additional parameters (None yet)
Pedro Escaleirad901a802022-04-05 17:32:13 +0100855 :param yaml_format: if the return shall be returned as an YAML string or as a
856 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +0200857 :return: If successful, it will return the following vector of arguments:
858 - K8s `namespace` in the cluster where the KDU lives
859 - `state` of the KDU instance. It can be:
860 - UNKNOWN
861 - DEPLOYED
862 - DELETED
863 - SUPERSEDED
864 - FAILED or
865 - DELETING
866 - List of `resources` (objects) that this release consists of, sorted by kind,
867 and the status of those resources
868 - Last `deployment_time`.
869
870 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000871 self.log.debug(
872 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
873 cluster_uuid, kdu_instance
874 )
875 )
876
David Garcia2a10e432022-06-17 14:27:54 +0200877 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
878
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000879 # sync local dir
David Garcia2a10e432022-06-17 14:27:54 +0200880 self.fs.sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000881
882 # get instance: needed to obtain namespace
David Garcia2a10e432022-06-17 14:27:54 +0200883 instances = await self._instances_list(cluster_id=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000884 for instance in instances:
885 if instance.get("name") == kdu_instance:
886 break
887 else:
888 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +0100889 raise K8sException(
890 "Instance name: {} not found in cluster: {}".format(
David Garcia2a10e432022-06-17 14:27:54 +0200891 kdu_instance, cluster_id
garciadeblas82b591c2021-03-24 09:22:13 +0100892 )
893 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000894
895 status = await self._status_kdu(
David Garcia2a10e432022-06-17 14:27:54 +0200896 cluster_id=cluster_id,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000897 kdu_instance=kdu_instance,
898 namespace=instance["namespace"],
Pedro Escaleirad901a802022-04-05 17:32:13 +0100899 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000900 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000901 )
902
903 # sync fs
David Garcia2a10e432022-06-17 14:27:54 +0200904 self.fs.reverse_sync(from_path=cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000905
906 return status
907
908 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000909 self.log.debug(
910 "inspect kdu_model values {} from (optional) repo: {}".format(
911 kdu_model, repo_url
912 )
913 )
914
915 return await self._exec_inspect_comand(
916 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
917 )
918
919 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000920 self.log.debug(
921 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
922 )
923
924 return await self._exec_inspect_comand(
925 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
926 )
927
928 async def synchronize_repos(self, cluster_uuid: str):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000929 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
930 try:
931 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
932 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
933
934 local_repo_list = await self.repo_list(cluster_uuid)
935 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
936
937 deleted_repo_list = []
938 added_repo_dict = {}
939
940 # iterate over the list of repos in the database that should be
941 # added if not present
942 for repo_name, db_repo in db_repo_dict.items():
943 try:
944 # check if it is already present
945 curr_repo_url = local_repo_dict.get(db_repo["name"])
946 repo_id = db_repo.get("_id")
947 if curr_repo_url != db_repo["url"]:
948 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +0100949 self.log.debug(
950 "repo {} url changed, delete and and again".format(
951 db_repo["url"]
952 )
953 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000954 await self.repo_remove(cluster_uuid, db_repo["name"])
955 deleted_repo_list.append(repo_id)
956
957 # add repo
958 self.log.debug("add repo {}".format(db_repo["name"]))
garciadeblas82b591c2021-03-24 09:22:13 +0100959 await self.repo_add(
960 cluster_uuid, db_repo["name"], db_repo["url"]
961 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000962 added_repo_dict[repo_id] = db_repo["name"]
963 except Exception as e:
964 raise K8sException(
965 "Error adding repo id: {}, err_msg: {} ".format(
966 repo_id, repr(e)
967 )
968 )
969
970 # Delete repos that are present but not in nbi_list
971 for repo_name in local_repo_dict:
972 if not db_repo_dict.get(repo_name) and repo_name != "stable":
973 self.log.debug("delete repo {}".format(repo_name))
974 try:
975 await self.repo_remove(cluster_uuid, repo_name)
976 deleted_repo_list.append(repo_name)
977 except Exception as e:
978 self.warning(
979 "Error deleting repo, name: {}, err_msg: {}".format(
980 repo_name, str(e)
981 )
982 )
983
984 return deleted_repo_list, added_repo_dict
985
986 except K8sException:
987 raise
988 except Exception as e:
989 # Do not raise errors synchronizing repos
990 self.log.error("Error synchronizing repos: {}".format(e))
991 raise Exception("Error synchronizing repos: {}".format(e))
992
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000993 def _get_db_repos_dict(self, repo_ids: list):
994 db_repos_dict = {}
995 for repo_id in repo_ids:
996 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
997 db_repos_dict[db_repo["name"]] = db_repo
998 return db_repos_dict
999
1000 """
1001 ####################################################################################
1002 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1003 ####################################################################################
1004 """
1005
1006 @abc.abstractmethod
1007 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1008 """
1009 Creates and returns base cluster and kube dirs and returns them.
1010 Also created helm3 dirs according to new directory specification, paths are
1011 not returned but assigned to helm environment variables
1012
1013 :param cluster_name: cluster_name
1014 :return: Dictionary with config_paths and dictionary with helm environment variables
1015 """
1016
1017 @abc.abstractmethod
1018 async def _cluster_init(self, cluster_id, namespace, paths, env):
1019 """
1020 Implements the helm version dependent cluster initialization
1021 """
1022
1023 @abc.abstractmethod
1024 async def _instances_list(self, cluster_id):
1025 """
1026 Implements the helm version dependent helm instances list
1027 """
1028
1029 @abc.abstractmethod
David Garcia05bccf72022-02-02 11:35:20 +01001030 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001031 """
1032 Implements the helm version dependent method to obtain services from a helm instance
1033 """
1034
1035 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001036 async def _status_kdu(
1037 self,
1038 cluster_id: str,
1039 kdu_instance: str,
1040 namespace: str = None,
Pedro Escaleirad901a802022-04-05 17:32:13 +01001041 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001042 show_error_log: bool = False,
Pedro Escaleirad901a802022-04-05 17:32:13 +01001043 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001044 """
1045 Implements the helm version dependent method to obtain status of a helm instance
1046 """
1047
1048 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001049 def _get_install_command(
David Garcia05bccf72022-02-02 11:35:20 +01001050 self,
1051 kdu_model,
1052 kdu_instance,
1053 namespace,
1054 params_str,
1055 version,
1056 atomic,
1057 timeout,
1058 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001059 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001060 """
1061 Obtain command to be executed to delete the indicated instance
1062 """
1063
1064 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001065 def _get_upgrade_command(
David Garcia05bccf72022-02-02 11:35:20 +01001066 self,
1067 kdu_model,
1068 kdu_instance,
1069 namespace,
1070 params_str,
1071 version,
1072 atomic,
1073 timeout,
1074 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001075 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001076 """
1077 Obtain command to be executed to upgrade the indicated instance
1078 """
1079
1080 @abc.abstractmethod
David Garcia05bccf72022-02-02 11:35:20 +01001081 def _get_rollback_command(
1082 self, kdu_instance, namespace, revision, kubeconfig
1083 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001084 """
1085 Obtain command to be executed to rollback the indicated instance
1086 """
1087
1088 @abc.abstractmethod
David Garcia05bccf72022-02-02 11:35:20 +01001089 def _get_uninstall_command(
1090 self, kdu_instance: str, namespace: str, kubeconfig: str
1091 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001092 """
1093 Obtain command to be executed to delete the indicated instance
1094 """
1095
1096 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001097 def _get_inspect_command(
1098 self, show_command: str, kdu_model: str, repo_str: str, version: str
1099 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001100 """
1101 Obtain command to be executed to obtain information about the kdu
1102 """
1103
1104 @abc.abstractmethod
1105 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1106 """
1107 Method call to uninstall cluster software for helm. This method is dependent
1108 of helm version
1109 For Helm v2 it will be called when Tiller must be uninstalled
1110 For Helm v3 it does nothing and does not need to be callled
1111 """
1112
lloretgalleg095392b2020-11-20 11:28:08 +00001113 @abc.abstractmethod
1114 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1115 """
1116 Obtains the cluster repos identifiers
1117 """
1118
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001119 """
1120 ####################################################################################
1121 ################################### P R I V A T E ##################################
1122 ####################################################################################
1123 """
1124
1125 @staticmethod
1126 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1127 if os.path.exists(filename):
1128 return True
1129 else:
1130 msg = "File {} does not exist".format(filename)
1131 if exception_if_not_exists:
1132 raise K8sException(msg)
1133
1134 @staticmethod
1135 def _remove_multiple_spaces(strobj):
1136 strobj = strobj.strip()
1137 while " " in strobj:
1138 strobj = strobj.replace(" ", " ")
1139 return strobj
1140
1141 @staticmethod
1142 def _output_to_lines(output: str) -> list:
1143 output_lines = list()
1144 lines = output.splitlines(keepends=False)
1145 for line in lines:
1146 line = line.strip()
1147 if len(line) > 0:
1148 output_lines.append(line)
1149 return output_lines
1150
1151 @staticmethod
1152 def _output_to_table(output: str) -> list:
1153 output_table = list()
1154 lines = output.splitlines(keepends=False)
1155 for line in lines:
1156 line = line.replace("\t", " ")
1157 line_list = list()
1158 output_table.append(line_list)
1159 cells = line.split(sep=" ")
1160 for cell in cells:
1161 cell = cell.strip()
1162 if len(cell) > 0:
1163 line_list.append(cell)
1164 return output_table
1165
1166 @staticmethod
1167 def _parse_services(output: str) -> list:
1168 lines = output.splitlines(keepends=False)
1169 services = []
1170 for line in lines:
1171 line = line.replace("\t", " ")
1172 cells = line.split(sep=" ")
1173 if len(cells) > 0 and cells[0].startswith("service/"):
1174 elems = cells[0].split(sep="/")
1175 if len(elems) > 1:
1176 services.append(elems[1])
1177 return services
1178
1179 @staticmethod
1180 def _get_deep(dictionary: dict, members: tuple):
1181 target = dictionary
1182 value = None
1183 try:
1184 for m in members:
1185 value = target.get(m)
1186 if not value:
1187 return None
1188 else:
1189 target = value
1190 except Exception:
1191 pass
1192 return value
1193
1194 # find key:value in several lines
1195 @staticmethod
1196 def _find_in_lines(p_lines: list, p_key: str) -> str:
1197 for line in p_lines:
1198 try:
1199 if line.startswith(p_key + ":"):
1200 parts = line.split(":")
1201 the_value = parts[1].strip()
1202 return the_value
1203 except Exception:
1204 # ignore it
1205 pass
1206 return None
1207
1208 @staticmethod
1209 def _lower_keys_list(input_list: list):
1210 """
1211 Transform the keys in a list of dictionaries to lower case and returns a new list
1212 of dictionaries
1213 """
1214 new_list = []
David Garciadd322062021-05-28 16:21:51 +02001215 if input_list:
1216 for dictionary in input_list:
1217 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1218 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001219 return new_list
1220
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001221 async def _local_async_exec(
1222 self,
1223 command: str,
1224 raise_exception_on_error: bool = False,
1225 show_error_log: bool = True,
1226 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001227 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001228 ) -> (str, int):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001229 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001230 self.log.debug(
1231 "Executing async local command: {}, env: {}".format(command, env)
1232 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001233
1234 # split command
1235 command = shlex.split(command)
1236
1237 environ = os.environ.copy()
1238 if env:
1239 environ.update(env)
1240
1241 try:
1242 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001243 *command,
1244 stdout=asyncio.subprocess.PIPE,
1245 stderr=asyncio.subprocess.PIPE,
1246 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001247 )
1248
1249 # wait for command terminate
1250 stdout, stderr = await process.communicate()
1251
1252 return_code = process.returncode
1253
1254 output = ""
1255 if stdout:
1256 output = stdout.decode("utf-8").strip()
1257 # output = stdout.decode()
1258 if stderr:
1259 output = stderr.decode("utf-8").strip()
1260 # output = stderr.decode()
1261
1262 if return_code != 0 and show_error_log:
1263 self.log.debug(
1264 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1265 )
1266 else:
1267 self.log.debug("Return code: {}".format(return_code))
1268
1269 if raise_exception_on_error and return_code != 0:
1270 raise K8sException(output)
1271
1272 if encode_utf8:
1273 output = output.encode("utf-8").strip()
1274 output = str(output).replace("\\n", "\n")
1275
1276 return output, return_code
1277
1278 except asyncio.CancelledError:
1279 raise
1280 except K8sException:
1281 raise
1282 except Exception as e:
1283 msg = "Exception executing command: {} -> {}".format(command, e)
1284 self.log.error(msg)
1285 if raise_exception_on_error:
1286 raise K8sException(e) from e
1287 else:
1288 return "", -1
1289
garciadeblas82b591c2021-03-24 09:22:13 +01001290 async def _local_async_exec_pipe(
1291 self,
1292 command1: str,
1293 command2: str,
1294 raise_exception_on_error: bool = True,
1295 show_error_log: bool = True,
1296 encode_utf8: bool = False,
1297 env: dict = None,
1298 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001299 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1300 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1301 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001302 self.log.debug(
1303 "Executing async local command: {}, env: {}".format(command, env)
1304 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001305
1306 # split command
1307 command1 = shlex.split(command1)
1308 command2 = shlex.split(command2)
1309
1310 environ = os.environ.copy()
1311 if env:
1312 environ.update(env)
1313
1314 try:
1315 read, write = os.pipe()
1316 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1317 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001318 process_2 = await asyncio.create_subprocess_exec(
1319 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1320 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001321 os.close(read)
1322 stdout, stderr = await process_2.communicate()
1323
1324 return_code = process_2.returncode
1325
1326 output = ""
1327 if stdout:
1328 output = stdout.decode("utf-8").strip()
1329 # output = stdout.decode()
1330 if stderr:
1331 output = stderr.decode("utf-8").strip()
1332 # output = stderr.decode()
1333
1334 if return_code != 0 and show_error_log:
1335 self.log.debug(
1336 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1337 )
1338 else:
1339 self.log.debug("Return code: {}".format(return_code))
1340
1341 if raise_exception_on_error and return_code != 0:
1342 raise K8sException(output)
1343
1344 if encode_utf8:
1345 output = output.encode("utf-8").strip()
1346 output = str(output).replace("\\n", "\n")
1347
1348 return output, return_code
1349 except asyncio.CancelledError:
1350 raise
1351 except K8sException:
1352 raise
1353 except Exception as e:
1354 msg = "Exception executing command: {} -> {}".format(command, e)
1355 self.log.error(msg)
1356 if raise_exception_on_error:
1357 raise K8sException(e) from e
1358 else:
1359 return "", -1
1360
1361 async def _get_service(self, cluster_id, service_name, namespace):
1362 """
1363 Obtains the data of the specified service in the k8cluster.
1364
1365 :param cluster_id: id of a K8s cluster known by OSM
1366 :param service_name: name of the K8s service in the specified namespace
1367 :param namespace: K8s namespace used by the KDU instance
1368 :return: If successful, it will return a service with the following data:
1369 - `name` of the service
1370 - `type` type of service in the k8 cluster
1371 - `ports` List of ports offered by the service, for each port includes at least
1372 name, port, protocol
1373 - `cluster_ip` Internal ip to be used inside k8s cluster
1374 - `external_ip` List of external ips (in case they are available)
1375 """
1376
1377 # init config, env
1378 paths, env = self._init_paths_env(
1379 cluster_name=cluster_id, create_if_not_exist=True
1380 )
1381
1382 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1383 self.kubectl_command, paths["kube_config"], namespace, service_name
1384 )
1385
1386 output, _rc = await self._local_async_exec(
1387 command=command, raise_exception_on_error=True, env=env
1388 )
1389
1390 data = yaml.load(output, Loader=yaml.SafeLoader)
1391
1392 service = {
1393 "name": service_name,
1394 "type": self._get_deep(data, ("spec", "type")),
1395 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001396 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001397 }
1398 if service["type"] == "LoadBalancer":
1399 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1400 ip_list = [elem["ip"] for elem in ip_map_list]
1401 service["external_ip"] = ip_list
1402
1403 return service
1404
1405 async def _exec_inspect_comand(
1406 self, inspect_command: str, kdu_model: str, repo_url: str = None
1407 ):
1408 """
1409 Obtains information about a kdu, no cluster (no env)
1410 """
1411
1412 repo_str = ""
1413 if repo_url:
1414 repo_str = " --repo {}".format(repo_url)
1415
1416 idx = kdu_model.find("/")
1417 if idx >= 0:
1418 idx += 1
1419 kdu_model = kdu_model[idx:]
1420
1421 version = ""
1422 if ":" in kdu_model:
1423 parts = kdu_model.split(sep=":")
1424 if len(parts) == 2:
1425 version = "--version {}".format(str(parts[1]))
1426 kdu_model = parts[0]
1427
garciadeblas82b591c2021-03-24 09:22:13 +01001428 full_command = self._get_inspect_command(
1429 inspect_command, kdu_model, repo_str, version
1430 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001431 output, _rc = await self._local_async_exec(
1432 command=full_command, encode_utf8=True
1433 )
1434
1435 return output
1436
1437 async def _store_status(
1438 self,
1439 cluster_id: str,
1440 operation: str,
1441 kdu_instance: str,
1442 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001443 db_dict: dict = None,
Pedro Escaleira3356a402022-04-23 19:55:45 +01001444 ) -> None:
1445 """
1446 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1447
1448 :param cluster_id (str): the cluster where the KDU instance is deployed
1449 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1450 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1451 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1452 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1453 values for the keys:
1454 - "collection": The Mongo DB collection to write to
1455 - "filter": The query filter to use in the update process
1456 - "path": The dot separated keys which targets the object to be updated
1457 Defaults to None.
1458 """
1459
1460 try:
1461 detailed_status = await self._status_kdu(
1462 cluster_id=cluster_id,
1463 kdu_instance=kdu_instance,
1464 yaml_format=False,
1465 namespace=namespace,
1466 )
1467
1468 status = detailed_status.get("info").get("description")
1469 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1470
1471 # write status to db
1472 result = await self.write_app_status_to_db(
1473 db_dict=db_dict,
1474 status=str(status),
1475 detailed_status=str(detailed_status),
1476 operation=operation,
1477 )
1478
1479 if not result:
1480 self.log.info("Error writing in database. Task exiting...")
1481
1482 except asyncio.CancelledError as e:
1483 self.log.warning(
1484 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1485 )
1486 except Exception as e:
1487 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001488
1489 # params for use in -f file
1490 # returns values file option and filename (in order to delete it at the end)
1491 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001492 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001493 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001494
1495 def get_random_number():
1496 r = random.randrange(start=1, stop=99999999)
1497 s = str(r)
1498 while len(s) < 10:
1499 s = "0" + s
1500 return s
1501
1502 params2 = dict()
1503 for key in params:
1504 value = params.get(key)
1505 if "!!yaml" in str(value):
1506 value = yaml.load(value[7:])
1507 params2[key] = value
1508
1509 values_file = get_random_number() + ".yaml"
1510 with open(values_file, "w") as stream:
1511 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1512
1513 return "-f {}".format(values_file), values_file
1514
1515 return "", None
1516
1517 # params for use in --set option
1518 @staticmethod
1519 def _params_to_set_option(params: dict) -> str:
1520 params_str = ""
1521 if params and len(params) > 0:
1522 start = True
1523 for key in params:
1524 value = params.get(key, None)
1525 if value is not None:
1526 if start:
1527 params_str += "--set "
1528 start = False
1529 else:
1530 params_str += ","
1531 params_str += "{}={}".format(key, value)
1532 return params_str
1533
1534 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001535 def generate_kdu_instance_name(**kwargs):
1536 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001537 # check embeded chart (file or dir)
1538 if chart_name.startswith("/"):
1539 # extract file or directory name
David Garcia05bccf72022-02-02 11:35:20 +01001540 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001541 # check URL
1542 elif "://" in chart_name:
1543 # extract last portion of URL
David Garcia05bccf72022-02-02 11:35:20 +01001544 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001545
1546 name = ""
1547 for c in chart_name:
1548 if c.isalpha() or c.isnumeric():
1549 name += c
1550 else:
1551 name += "-"
1552 if len(name) > 35:
1553 name = name[0:35]
1554
1555 # if does not start with alpha character, prefix 'a'
1556 if not name[0].isalpha():
1557 name = "a" + name
1558
1559 name += "-"
1560
1561 def get_random_number():
1562 r = random.randrange(start=1, stop=99999999)
1563 s = str(r)
1564 s = s.rjust(10, "0")
1565 return s
1566
1567 name = name + get_random_number()
1568 return name.lower()
garciadeblasd3399352022-04-08 22:53:25 +02001569
limon0fbea822022-07-21 13:55:55 +02001570 def _split_repo(self, kdu_model: str) -> str:
garciadeblasd3399352022-04-08 22:53:25 +02001571 repo_name = None
1572 idx = kdu_model.find("/")
1573 if idx >= 0:
1574 repo_name = kdu_model[:idx]
1575 return repo_name