blob: b0d8e6d52624a034a9aa328e9870867d273fac88 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleira3e9293d2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010093 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000094 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010095 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010099
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000105
106 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200121 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100128 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000129 else:
130 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000131
garciadeblas82b591c2021-03-24 09:22:13 +0100132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100152 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000153
154 async def repo_add(
garciadeblas82b591c2021-03-24 09:22:13 +0100155 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000156 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100159 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100160 )
161 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000162
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000163 # init_env
164 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100165 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000166 )
167
bravof53dd7462021-11-17 11:14:57 -0300168 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100169 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300170
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000171 # helm repo add name url
bravof53dd7462021-11-17 11:14:57 -0300172 command = "env KUBECONFIG={} {} repo add {} {}".format(
173 paths["kube_config"], self._helm_command, name, url
174 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000175 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100176 await self._local_async_exec(
177 command=command, raise_exception_on_error=True, env=env
178 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000179
garciadeblasdfb1eb62022-05-04 10:57:36 +0200180 # helm repo update
garciadeblasfedcd652022-05-04 11:07:41 +0200181 command = "env KUBECONFIG={} {} repo update {}".format(
182 paths["kube_config"], self._helm_command, name
garciadeblasdfb1eb62022-05-04 10:57:36 +0200183 )
184 self.log.debug("updating repo: {}".format(command))
185 await self._local_async_exec(
186 command=command, raise_exception_on_error=False, env=env
187 )
188
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000189 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100190 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000191
garciadeblas468002b2022-04-08 22:53:25 +0200192 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
193 self.log.debug(
194 "Cluster {}, updating {} repository {}".format(
195 cluster_uuid, repo_type, name
196 )
197 )
198
199 # init_env
200 paths, env = self._init_paths_env(
201 cluster_name=cluster_uuid, create_if_not_exist=True
202 )
203
204 # sync local dir
205 self.fs.sync(from_path=cluster_uuid)
206
207 # helm repo update
208 command = "{} repo update {}".format(self._helm_command, name)
209 self.log.debug("updating repo: {}".format(command))
210 await self._local_async_exec(
211 command=command, raise_exception_on_error=False, env=env
212 )
213
214 # sync fs
215 self.fs.reverse_sync(from_path=cluster_uuid)
216
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000217 async def repo_list(self, cluster_uuid: str) -> list:
218 """
219 Get the list of registered repositories
220
221 :return: list of registered repositories: [ (name, url) .... ]
222 """
223
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100224 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000225
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000226 # config filename
227 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100228 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000229 )
230
bravof53dd7462021-11-17 11:14:57 -0300231 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100232 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300233
234 command = "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths["kube_config"], self._helm_command
236 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000237
238 # Set exception to false because if there are no repos just want an empty list
239 output, _rc = await self._local_async_exec(
240 command=command, raise_exception_on_error=False, env=env
241 )
242
243 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100244 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000245
246 if _rc == 0:
247 if output and len(output) > 0:
248 repos = yaml.load(output, Loader=yaml.SafeLoader)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self._lower_keys_list(repos)
251 else:
252 return []
253 else:
254 return []
255
256 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100257 self.log.debug(
258 "remove {} repositories for cluster {}".format(name, cluster_uuid)
259 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000260
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000261 # init env, paths
262 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100263 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000264 )
265
bravof53dd7462021-11-17 11:14:57 -0300266 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100267 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300268
269 command = "env KUBECONFIG={} {} repo remove {}".format(
270 paths["kube_config"], self._helm_command, name
271 )
garciadeblas82b591c2021-03-24 09:22:13 +0100272 await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000274 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000275
276 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100277 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000278
279 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100280 self,
281 cluster_uuid: str,
282 force: bool = False,
283 uninstall_sw: bool = False,
284 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000285 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200286 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287
David Garciaeb8943a2021-04-12 12:07:37 +0200288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
289
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
295 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100296 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100297 self.log.debug(
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100299 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100300 )
301 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000302
303 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100304 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000305
306 # uninstall releases if needed.
307 if uninstall_sw:
308 releases = await self.instances_list(cluster_uuid=cluster_uuid)
309 if len(releases) > 0:
310 if force:
311 for r in releases:
312 try:
313 kdu_instance = r.get("name")
314 chart = r.get("chart")
315 self.log.debug(
316 "Uninstalling {} -> {}".format(chart, kdu_instance)
317 )
318 await self.uninstall(
319 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
320 )
321 except Exception as e:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
324 # raised an error
325 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100326 "Error uninstalling release {}: {}".format(
327 kdu_instance, e
328 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000329 )
330 else:
331 msg = (
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100333 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000334 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100335 uninstall_sw = (
336 False # Allow to remove k8s cluster without removing Tiller
337 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000338
339 if uninstall_sw:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100340 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000341
342 # delete cluster directory
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100343 self.log.debug("Removing directory {}".format(cluster_uuid))
344 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000345 # Remove also local directorio if still exist
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100346 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000347 shutil.rmtree(direct, ignore_errors=True)
348
349 return True
350
lloretgalleg095392b2020-11-20 11:28:08 +0000351 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100352 self,
353 cluster_id: str,
354 kdu_model: str,
355 paths: dict,
356 env: dict,
357 kdu_instance: str,
358 atomic: bool = True,
359 timeout: float = 300,
360 params: dict = None,
361 db_dict: dict = None,
362 kdu_name: str = None,
363 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364 ):
bravof53dd7462021-11-17 11:14:57 -0300365 # init env, paths
366 paths, env = self._init_paths_env(
367 cluster_name=cluster_id, create_if_not_exist=True
368 )
369
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000370 # params to str
371 params_str, file_to_delete = self._params_to_file_option(
372 cluster_id=cluster_id, params=params
373 )
374
375 # version
376 version = None
377 if ":" in kdu_model:
378 parts = kdu_model.split(sep=":")
379 if len(parts) == 2:
380 version = str(parts[1])
381 kdu_model = parts[0]
382
garciadeblas468002b2022-04-08 22:53:25 +0200383 repo = self._split_repo(kdu_model)
384 if repo:
385 self.repo_update(cluster_id, repo)
386
garciadeblas82b591c2021-03-24 09:22:13 +0100387 command = self._get_install_command(
bravof53dd7462021-11-17 11:14:57 -0300388 kdu_model,
389 kdu_instance,
390 namespace,
391 params_str,
392 version,
393 atomic,
394 timeout,
395 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100396 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000397
398 self.log.debug("installing: {}".format(command))
399
400 if atomic:
401 # exec helm in a task
402 exec_task = asyncio.ensure_future(
403 coro_or_future=self._local_async_exec(
404 command=command, raise_exception_on_error=False, env=env
405 )
406 )
407
408 # write status in another task
409 status_task = asyncio.ensure_future(
410 coro_or_future=self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000416 )
417 )
418
419 # wait for execution task
420 await asyncio.wait([exec_task])
421
422 # cancel status task
423 status_task.cancel()
424
425 output, rc = exec_task.result()
426
427 else:
428
429 output, rc = await self._local_async_exec(
430 command=command, raise_exception_on_error=False, env=env
431 )
432
433 # remove temporal values yaml file
434 if file_to_delete:
435 os.remove(file_to_delete)
436
437 # write final status
438 await self._store_status(
439 cluster_id=cluster_id,
440 kdu_instance=kdu_instance,
441 namespace=namespace,
442 db_dict=db_dict,
443 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000444 )
445
446 if rc != 0:
447 msg = "Error executing command: {}\nOutput: {}".format(command, output)
448 self.log.error(msg)
449 raise K8sException(msg)
450
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000451 async def upgrade(
452 self,
453 cluster_uuid: str,
454 kdu_instance: str,
455 kdu_model: str = None,
456 atomic: bool = True,
457 timeout: float = 300,
458 params: dict = None,
459 db_dict: dict = None,
460 ):
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100461 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000462
463 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100464 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000465
466 # look for instance to obtain namespace
467 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
468 if not instance_info:
469 raise K8sException("kdu_instance {} not found".format(kdu_instance))
470
471 # init env, paths
472 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100473 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000474 )
475
bravof53dd7462021-11-17 11:14:57 -0300476 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100477 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300478
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000479 # params to str
480 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100481 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000482 )
483
484 # version
485 version = None
486 if ":" in kdu_model:
487 parts = kdu_model.split(sep=":")
488 if len(parts) == 2:
489 version = str(parts[1])
490 kdu_model = parts[0]
491
garciadeblas468002b2022-04-08 22:53:25 +0200492 repo = self._split_repo(kdu_model)
493 if repo:
494 self.repo_update(cluster_uuid, repo)
495
garciadeblas82b591c2021-03-24 09:22:13 +0100496 command = self._get_upgrade_command(
497 kdu_model,
498 kdu_instance,
499 instance_info["namespace"],
500 params_str,
501 version,
502 atomic,
503 timeout,
bravof53dd7462021-11-17 11:14:57 -0300504 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100505 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506
507 self.log.debug("upgrading: {}".format(command))
508
509 if atomic:
510
511 # exec helm in a task
512 exec_task = asyncio.ensure_future(
513 coro_or_future=self._local_async_exec(
514 command=command, raise_exception_on_error=False, env=env
515 )
516 )
517 # write status in another task
518 status_task = asyncio.ensure_future(
519 coro_or_future=self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100520 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000521 kdu_instance=kdu_instance,
522 namespace=instance_info["namespace"],
523 db_dict=db_dict,
524 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525 )
526 )
527
528 # wait for execution task
529 await asyncio.wait([exec_task])
530
531 # cancel status task
532 status_task.cancel()
533 output, rc = exec_task.result()
534
535 else:
536
537 output, rc = await self._local_async_exec(
538 command=command, raise_exception_on_error=False, env=env
539 )
540
541 # remove temporal values yaml file
542 if file_to_delete:
543 os.remove(file_to_delete)
544
545 # write final status
546 await self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100547 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000548 kdu_instance=kdu_instance,
549 namespace=instance_info["namespace"],
550 db_dict=db_dict,
551 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000552 )
553
554 if rc != 0:
555 msg = "Error executing command: {}\nOutput: {}".format(command, output)
556 self.log.error(msg)
557 raise K8sException(msg)
558
559 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100560 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000561
562 # return new revision number
563 instance = await self.get_instance_info(
564 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
565 )
566 if instance:
567 revision = int(instance.get("revision"))
568 self.log.debug("New revision: {}".format(revision))
569 return revision
570 else:
571 return 0
572
aktas2962f3e2021-03-15 11:05:35 +0300573 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100574 self,
575 kdu_instance: str,
576 scale: int,
577 resource_name: str,
578 total_timeout: float = 1800,
579 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300580 ):
581 raise NotImplementedError("Method not implemented")
582
583 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100584 self,
585 resource_name: str,
586 kdu_instance: str,
587 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300588 ):
589 raise NotImplementedError("Method not implemented")
590
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000591 async def rollback(
592 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
593 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000594 self.log.debug(
595 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100596 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000597 )
598 )
599
600 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100601 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000602
603 # look for instance to obtain namespace
604 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
605 if not instance_info:
606 raise K8sException("kdu_instance {} not found".format(kdu_instance))
607
608 # init env, paths
609 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100610 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000611 )
612
bravof53dd7462021-11-17 11:14:57 -0300613 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100614 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300615
garciadeblas82b591c2021-03-24 09:22:13 +0100616 command = self._get_rollback_command(
bravof53dd7462021-11-17 11:14:57 -0300617 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100618 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000619
620 self.log.debug("rolling_back: {}".format(command))
621
622 # exec helm in a task
623 exec_task = asyncio.ensure_future(
624 coro_or_future=self._local_async_exec(
625 command=command, raise_exception_on_error=False, env=env
626 )
627 )
628 # write status in another task
629 status_task = asyncio.ensure_future(
630 coro_or_future=self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100631 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000632 kdu_instance=kdu_instance,
633 namespace=instance_info["namespace"],
634 db_dict=db_dict,
635 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000636 )
637 )
638
639 # wait for execution task
640 await asyncio.wait([exec_task])
641
642 # cancel status task
643 status_task.cancel()
644
645 output, rc = exec_task.result()
646
647 # write final status
648 await self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100649 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000650 kdu_instance=kdu_instance,
651 namespace=instance_info["namespace"],
652 db_dict=db_dict,
653 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000654 )
655
656 if rc != 0:
657 msg = "Error executing command: {}\nOutput: {}".format(command, output)
658 self.log.error(msg)
659 raise K8sException(msg)
660
661 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100662 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000663
664 # return new revision number
665 instance = await self.get_instance_info(
666 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
667 )
668 if instance:
669 revision = int(instance.get("revision"))
670 self.log.debug("New revision: {}".format(revision))
671 return revision
672 else:
673 return 0
674
David Garciaeb8943a2021-04-12 12:07:37 +0200675 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000676 """
677 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
678 (this call should happen after all _terminate-config-primitive_ of the VNF
679 are invoked).
680
681 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
682 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200683 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000684 :return: True if successful
685 """
686
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000687 self.log.debug(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100688 "uninstall kdu_instance {} from cluster {}".format(
689 kdu_instance, cluster_uuid
690 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000691 )
692
693 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100694 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000695
696 # look for instance to obtain namespace
697 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
698 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200699 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
700 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000701 # init env, paths
702 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100703 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000704 )
705
bravof53dd7462021-11-17 11:14:57 -0300706 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100707 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300708
709 command = self._get_uninstall_command(
710 kdu_instance, instance_info["namespace"], paths["kube_config"]
711 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000712 output, _rc = await self._local_async_exec(
713 command=command, raise_exception_on_error=True, env=env
714 )
715
716 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100717 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000718
719 return self._output_to_table(output)
720
721 async def instances_list(self, cluster_uuid: str) -> list:
722 """
723 returns a list of deployed releases in a cluster
724
725 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
726 :return:
727 """
728
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100729 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000730
731 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100732 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000733
734 # execute internal command
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100735 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000736
737 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100738 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000739
740 return result
741
742 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
743 instances = await self.instances_list(cluster_uuid=cluster_uuid)
744 for instance in instances:
745 if instance.get("name") == kdu_instance:
746 return instance
747 self.log.debug("Instance {} not found".format(kdu_instance))
748 return None
749
750 async def exec_primitive(
751 self,
752 cluster_uuid: str = None,
753 kdu_instance: str = None,
754 primitive_name: str = None,
755 timeout: float = 300,
756 params: dict = None,
757 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200758 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000759 ) -> str:
760 """Exec primitive (Juju action)
761
762 :param cluster_uuid: The UUID of the cluster or namespace:cluster
763 :param kdu_instance: The unique name of the KDU instance
764 :param primitive_name: Name of action that will be executed
765 :param timeout: Timeout for action execution
766 :param params: Dictionary of all the parameters needed for the action
767 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200768 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000769
770 :return: Returns the output of the action
771 """
772 raise K8sException(
773 "KDUs deployed with Helm don't support actions "
774 "different from rollback, upgrade and status"
775 )
776
garciadeblas82b591c2021-03-24 09:22:13 +0100777 async def get_services(
778 self, cluster_uuid: str, kdu_instance: str, namespace: str
779 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000780 """
781 Returns a list of services defined for the specified kdu instance.
782
783 :param cluster_uuid: UUID of a K8s cluster known by OSM
784 :param kdu_instance: unique name for the KDU instance
785 :param namespace: K8s namespace used by the KDU instance
786 :return: If successful, it will return a list of services, Each service
787 can have the following data:
788 - `name` of the service
789 - `type` type of service in the k8 cluster
790 - `ports` List of ports offered by the service, for each port includes at least
791 name, port, protocol
792 - `cluster_ip` Internal ip to be used inside k8s cluster
793 - `external_ip` List of external ips (in case they are available)
794 """
795
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000796 self.log.debug(
797 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
798 cluster_uuid, kdu_instance
799 )
800 )
801
bravof53dd7462021-11-17 11:14:57 -0300802 # init env, paths
803 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100804 cluster_name=cluster_uuid, create_if_not_exist=True
bravof53dd7462021-11-17 11:14:57 -0300805 )
806
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000807 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100808 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000809
810 # get list of services names for kdu
bravof53dd7462021-11-17 11:14:57 -0300811 service_names = await self._get_services(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100812 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof53dd7462021-11-17 11:14:57 -0300813 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000814
815 service_list = []
816 for service in service_names:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100817 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000818 service_list.append(service)
819
820 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100821 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000822
823 return service_list
824
garciadeblas82b591c2021-03-24 09:22:13 +0100825 async def get_service(
826 self, cluster_uuid: str, service_name: str, namespace: str
827 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000828
829 self.log.debug(
830 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +0100831 service_name, namespace, cluster_uuid
832 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000833 )
834
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000835 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100836 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000837
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100838 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000839
840 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100841 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000842
843 return service
844
Pedro Escaleira3e9293d2022-04-05 17:32:13 +0100845 async def status_kdu(
846 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
847 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +0200848 """
849 This call would retrieve tha current state of a given KDU instance. It would be
850 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
851 values_ of the configuration parameters applied to a given instance. This call
852 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000853
David Garciaeb8943a2021-04-12 12:07:37 +0200854 :param cluster_uuid: UUID of a K8s cluster known by OSM
855 :param kdu_instance: unique name for the KDU instance
856 :param kwargs: Additional parameters (None yet)
Pedro Escaleira3e9293d2022-04-05 17:32:13 +0100857 :param yaml_format: if the return shall be returned as an YAML string or as a
858 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +0200859 :return: If successful, it will return the following vector of arguments:
860 - K8s `namespace` in the cluster where the KDU lives
861 - `state` of the KDU instance. It can be:
862 - UNKNOWN
863 - DEPLOYED
864 - DELETED
865 - SUPERSEDED
866 - FAILED or
867 - DELETING
868 - List of `resources` (objects) that this release consists of, sorted by kind,
869 and the status of those resources
870 - Last `deployment_time`.
871
872 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000873 self.log.debug(
874 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
875 cluster_uuid, kdu_instance
876 )
877 )
878
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000879 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100880 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000881
882 # get instance: needed to obtain namespace
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100883 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000884 for instance in instances:
885 if instance.get("name") == kdu_instance:
886 break
887 else:
888 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +0100889 raise K8sException(
890 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100891 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +0100892 )
893 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000894
895 status = await self._status_kdu(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100896 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000897 kdu_instance=kdu_instance,
898 namespace=instance["namespace"],
Pedro Escaleira3e9293d2022-04-05 17:32:13 +0100899 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000900 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000901 )
902
903 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100904 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000905
906 return status
907
908 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
909
910 self.log.debug(
911 "inspect kdu_model values {} from (optional) repo: {}".format(
912 kdu_model, repo_url
913 )
914 )
915
916 return await self._exec_inspect_comand(
917 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
918 )
919
920 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
921
922 self.log.debug(
923 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
924 )
925
926 return await self._exec_inspect_comand(
927 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
928 )
929
930 async def synchronize_repos(self, cluster_uuid: str):
931
932 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
933 try:
934 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
935 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
936
937 local_repo_list = await self.repo_list(cluster_uuid)
938 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
939
940 deleted_repo_list = []
941 added_repo_dict = {}
942
943 # iterate over the list of repos in the database that should be
944 # added if not present
945 for repo_name, db_repo in db_repo_dict.items():
946 try:
947 # check if it is already present
948 curr_repo_url = local_repo_dict.get(db_repo["name"])
949 repo_id = db_repo.get("_id")
950 if curr_repo_url != db_repo["url"]:
951 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +0100952 self.log.debug(
953 "repo {} url changed, delete and and again".format(
954 db_repo["url"]
955 )
956 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000957 await self.repo_remove(cluster_uuid, db_repo["name"])
958 deleted_repo_list.append(repo_id)
959
960 # add repo
961 self.log.debug("add repo {}".format(db_repo["name"]))
garciadeblas82b591c2021-03-24 09:22:13 +0100962 await self.repo_add(
963 cluster_uuid, db_repo["name"], db_repo["url"]
964 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000965 added_repo_dict[repo_id] = db_repo["name"]
966 except Exception as e:
967 raise K8sException(
968 "Error adding repo id: {}, err_msg: {} ".format(
969 repo_id, repr(e)
970 )
971 )
972
973 # Delete repos that are present but not in nbi_list
974 for repo_name in local_repo_dict:
975 if not db_repo_dict.get(repo_name) and repo_name != "stable":
976 self.log.debug("delete repo {}".format(repo_name))
977 try:
978 await self.repo_remove(cluster_uuid, repo_name)
979 deleted_repo_list.append(repo_name)
980 except Exception as e:
981 self.warning(
982 "Error deleting repo, name: {}, err_msg: {}".format(
983 repo_name, str(e)
984 )
985 )
986
987 return deleted_repo_list, added_repo_dict
988
989 except K8sException:
990 raise
991 except Exception as e:
992 # Do not raise errors synchronizing repos
993 self.log.error("Error synchronizing repos: {}".format(e))
994 raise Exception("Error synchronizing repos: {}".format(e))
995
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000996 def _get_db_repos_dict(self, repo_ids: list):
997 db_repos_dict = {}
998 for repo_id in repo_ids:
999 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1000 db_repos_dict[db_repo["name"]] = db_repo
1001 return db_repos_dict
1002
1003 """
1004 ####################################################################################
1005 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1006 ####################################################################################
1007 """
1008
1009 @abc.abstractmethod
1010 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1011 """
1012 Creates and returns base cluster and kube dirs and returns them.
1013 Also created helm3 dirs according to new directory specification, paths are
1014 not returned but assigned to helm environment variables
1015
1016 :param cluster_name: cluster_name
1017 :return: Dictionary with config_paths and dictionary with helm environment variables
1018 """
1019
1020 @abc.abstractmethod
1021 async def _cluster_init(self, cluster_id, namespace, paths, env):
1022 """
1023 Implements the helm version dependent cluster initialization
1024 """
1025
1026 @abc.abstractmethod
1027 async def _instances_list(self, cluster_id):
1028 """
1029 Implements the helm version dependent helm instances list
1030 """
1031
1032 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001033 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001034 """
1035 Implements the helm version dependent method to obtain services from a helm instance
1036 """
1037
1038 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001039 async def _status_kdu(
1040 self,
1041 cluster_id: str,
1042 kdu_instance: str,
1043 namespace: str = None,
Pedro Escaleira3e9293d2022-04-05 17:32:13 +01001044 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001045 show_error_log: bool = False,
Pedro Escaleira3e9293d2022-04-05 17:32:13 +01001046 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001047 """
1048 Implements the helm version dependent method to obtain status of a helm instance
1049 """
1050
1051 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001052 def _get_install_command(
bravof53dd7462021-11-17 11:14:57 -03001053 self,
1054 kdu_model,
1055 kdu_instance,
1056 namespace,
1057 params_str,
1058 version,
1059 atomic,
1060 timeout,
1061 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001062 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001063 """
1064 Obtain command to be executed to delete the indicated instance
1065 """
1066
1067 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001068 def _get_upgrade_command(
bravof53dd7462021-11-17 11:14:57 -03001069 self,
1070 kdu_model,
1071 kdu_instance,
1072 namespace,
1073 params_str,
1074 version,
1075 atomic,
1076 timeout,
1077 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001078 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001079 """
1080 Obtain command to be executed to upgrade the indicated instance
1081 """
1082
1083 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001084 def _get_rollback_command(
1085 self, kdu_instance, namespace, revision, kubeconfig
1086 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001087 """
1088 Obtain command to be executed to rollback the indicated instance
1089 """
1090
1091 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001092 def _get_uninstall_command(
1093 self, kdu_instance: str, namespace: str, kubeconfig: str
1094 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001095 """
1096 Obtain command to be executed to delete the indicated instance
1097 """
1098
1099 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001100 def _get_inspect_command(
1101 self, show_command: str, kdu_model: str, repo_str: str, version: str
1102 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001103 """
1104 Obtain command to be executed to obtain information about the kdu
1105 """
1106
1107 @abc.abstractmethod
1108 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1109 """
1110 Method call to uninstall cluster software for helm. This method is dependent
1111 of helm version
1112 For Helm v2 it will be called when Tiller must be uninstalled
1113 For Helm v3 it does nothing and does not need to be callled
1114 """
1115
lloretgalleg095392b2020-11-20 11:28:08 +00001116 @abc.abstractmethod
1117 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1118 """
1119 Obtains the cluster repos identifiers
1120 """
1121
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001122 """
1123 ####################################################################################
1124 ################################### P R I V A T E ##################################
1125 ####################################################################################
1126 """
1127
1128 @staticmethod
1129 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1130 if os.path.exists(filename):
1131 return True
1132 else:
1133 msg = "File {} does not exist".format(filename)
1134 if exception_if_not_exists:
1135 raise K8sException(msg)
1136
1137 @staticmethod
1138 def _remove_multiple_spaces(strobj):
1139 strobj = strobj.strip()
1140 while " " in strobj:
1141 strobj = strobj.replace(" ", " ")
1142 return strobj
1143
1144 @staticmethod
1145 def _output_to_lines(output: str) -> list:
1146 output_lines = list()
1147 lines = output.splitlines(keepends=False)
1148 for line in lines:
1149 line = line.strip()
1150 if len(line) > 0:
1151 output_lines.append(line)
1152 return output_lines
1153
1154 @staticmethod
1155 def _output_to_table(output: str) -> list:
1156 output_table = list()
1157 lines = output.splitlines(keepends=False)
1158 for line in lines:
1159 line = line.replace("\t", " ")
1160 line_list = list()
1161 output_table.append(line_list)
1162 cells = line.split(sep=" ")
1163 for cell in cells:
1164 cell = cell.strip()
1165 if len(cell) > 0:
1166 line_list.append(cell)
1167 return output_table
1168
1169 @staticmethod
1170 def _parse_services(output: str) -> list:
1171 lines = output.splitlines(keepends=False)
1172 services = []
1173 for line in lines:
1174 line = line.replace("\t", " ")
1175 cells = line.split(sep=" ")
1176 if len(cells) > 0 and cells[0].startswith("service/"):
1177 elems = cells[0].split(sep="/")
1178 if len(elems) > 1:
1179 services.append(elems[1])
1180 return services
1181
1182 @staticmethod
1183 def _get_deep(dictionary: dict, members: tuple):
1184 target = dictionary
1185 value = None
1186 try:
1187 for m in members:
1188 value = target.get(m)
1189 if not value:
1190 return None
1191 else:
1192 target = value
1193 except Exception:
1194 pass
1195 return value
1196
1197 # find key:value in several lines
1198 @staticmethod
1199 def _find_in_lines(p_lines: list, p_key: str) -> str:
1200 for line in p_lines:
1201 try:
1202 if line.startswith(p_key + ":"):
1203 parts = line.split(":")
1204 the_value = parts[1].strip()
1205 return the_value
1206 except Exception:
1207 # ignore it
1208 pass
1209 return None
1210
1211 @staticmethod
1212 def _lower_keys_list(input_list: list):
1213 """
1214 Transform the keys in a list of dictionaries to lower case and returns a new list
1215 of dictionaries
1216 """
1217 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001218 if input_list:
1219 for dictionary in input_list:
1220 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1221 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001222 return new_list
1223
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001224 async def _local_async_exec(
1225 self,
1226 command: str,
1227 raise_exception_on_error: bool = False,
1228 show_error_log: bool = True,
1229 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001230 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001231 ) -> (str, int):
1232
1233 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001234 self.log.debug(
1235 "Executing async local command: {}, env: {}".format(command, env)
1236 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001237
1238 # split command
1239 command = shlex.split(command)
1240
1241 environ = os.environ.copy()
1242 if env:
1243 environ.update(env)
1244
1245 try:
1246 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001247 *command,
1248 stdout=asyncio.subprocess.PIPE,
1249 stderr=asyncio.subprocess.PIPE,
1250 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001251 )
1252
1253 # wait for command terminate
1254 stdout, stderr = await process.communicate()
1255
1256 return_code = process.returncode
1257
1258 output = ""
1259 if stdout:
1260 output = stdout.decode("utf-8").strip()
1261 # output = stdout.decode()
1262 if stderr:
1263 output = stderr.decode("utf-8").strip()
1264 # output = stderr.decode()
1265
1266 if return_code != 0 and show_error_log:
1267 self.log.debug(
1268 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1269 )
1270 else:
1271 self.log.debug("Return code: {}".format(return_code))
1272
1273 if raise_exception_on_error and return_code != 0:
1274 raise K8sException(output)
1275
1276 if encode_utf8:
1277 output = output.encode("utf-8").strip()
1278 output = str(output).replace("\\n", "\n")
1279
1280 return output, return_code
1281
1282 except asyncio.CancelledError:
1283 raise
1284 except K8sException:
1285 raise
1286 except Exception as e:
1287 msg = "Exception executing command: {} -> {}".format(command, e)
1288 self.log.error(msg)
1289 if raise_exception_on_error:
1290 raise K8sException(e) from e
1291 else:
1292 return "", -1
1293
garciadeblas82b591c2021-03-24 09:22:13 +01001294 async def _local_async_exec_pipe(
1295 self,
1296 command1: str,
1297 command2: str,
1298 raise_exception_on_error: bool = True,
1299 show_error_log: bool = True,
1300 encode_utf8: bool = False,
1301 env: dict = None,
1302 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001303
1304 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1305 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1306 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001307 self.log.debug(
1308 "Executing async local command: {}, env: {}".format(command, env)
1309 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001310
1311 # split command
1312 command1 = shlex.split(command1)
1313 command2 = shlex.split(command2)
1314
1315 environ = os.environ.copy()
1316 if env:
1317 environ.update(env)
1318
1319 try:
1320 read, write = os.pipe()
1321 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1322 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001323 process_2 = await asyncio.create_subprocess_exec(
1324 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1325 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001326 os.close(read)
1327 stdout, stderr = await process_2.communicate()
1328
1329 return_code = process_2.returncode
1330
1331 output = ""
1332 if stdout:
1333 output = stdout.decode("utf-8").strip()
1334 # output = stdout.decode()
1335 if stderr:
1336 output = stderr.decode("utf-8").strip()
1337 # output = stderr.decode()
1338
1339 if return_code != 0 and show_error_log:
1340 self.log.debug(
1341 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1342 )
1343 else:
1344 self.log.debug("Return code: {}".format(return_code))
1345
1346 if raise_exception_on_error and return_code != 0:
1347 raise K8sException(output)
1348
1349 if encode_utf8:
1350 output = output.encode("utf-8").strip()
1351 output = str(output).replace("\\n", "\n")
1352
1353 return output, return_code
1354 except asyncio.CancelledError:
1355 raise
1356 except K8sException:
1357 raise
1358 except Exception as e:
1359 msg = "Exception executing command: {} -> {}".format(command, e)
1360 self.log.error(msg)
1361 if raise_exception_on_error:
1362 raise K8sException(e) from e
1363 else:
1364 return "", -1
1365
1366 async def _get_service(self, cluster_id, service_name, namespace):
1367 """
1368 Obtains the data of the specified service in the k8cluster.
1369
1370 :param cluster_id: id of a K8s cluster known by OSM
1371 :param service_name: name of the K8s service in the specified namespace
1372 :param namespace: K8s namespace used by the KDU instance
1373 :return: If successful, it will return a service with the following data:
1374 - `name` of the service
1375 - `type` type of service in the k8 cluster
1376 - `ports` List of ports offered by the service, for each port includes at least
1377 name, port, protocol
1378 - `cluster_ip` Internal ip to be used inside k8s cluster
1379 - `external_ip` List of external ips (in case they are available)
1380 """
1381
1382 # init config, env
1383 paths, env = self._init_paths_env(
1384 cluster_name=cluster_id, create_if_not_exist=True
1385 )
1386
1387 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1388 self.kubectl_command, paths["kube_config"], namespace, service_name
1389 )
1390
1391 output, _rc = await self._local_async_exec(
1392 command=command, raise_exception_on_error=True, env=env
1393 )
1394
1395 data = yaml.load(output, Loader=yaml.SafeLoader)
1396
1397 service = {
1398 "name": service_name,
1399 "type": self._get_deep(data, ("spec", "type")),
1400 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001401 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001402 }
1403 if service["type"] == "LoadBalancer":
1404 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1405 ip_list = [elem["ip"] for elem in ip_map_list]
1406 service["external_ip"] = ip_list
1407
1408 return service
1409
1410 async def _exec_inspect_comand(
1411 self, inspect_command: str, kdu_model: str, repo_url: str = None
1412 ):
1413 """
1414 Obtains information about a kdu, no cluster (no env)
1415 """
1416
1417 repo_str = ""
1418 if repo_url:
1419 repo_str = " --repo {}".format(repo_url)
1420
1421 idx = kdu_model.find("/")
1422 if idx >= 0:
1423 idx += 1
1424 kdu_model = kdu_model[idx:]
1425
1426 version = ""
1427 if ":" in kdu_model:
1428 parts = kdu_model.split(sep=":")
1429 if len(parts) == 2:
1430 version = "--version {}".format(str(parts[1]))
1431 kdu_model = parts[0]
1432
garciadeblas82b591c2021-03-24 09:22:13 +01001433 full_command = self._get_inspect_command(
1434 inspect_command, kdu_model, repo_str, version
1435 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001436 output, _rc = await self._local_async_exec(
1437 command=full_command, encode_utf8=True
1438 )
1439
1440 return output
1441
1442 async def _store_status(
1443 self,
1444 cluster_id: str,
1445 operation: str,
1446 kdu_instance: str,
1447 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001448 db_dict: dict = None,
Pedro Escaleira9971a392022-04-23 19:55:45 +01001449 ) -> None:
1450 """
1451 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1452
1453 :param cluster_id (str): the cluster where the KDU instance is deployed
1454 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1455 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1456 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1457 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1458 values for the keys:
1459 - "collection": The Mongo DB collection to write to
1460 - "filter": The query filter to use in the update process
1461 - "path": The dot separated keys which targets the object to be updated
1462 Defaults to None.
1463 """
1464
1465 try:
1466 detailed_status = await self._status_kdu(
1467 cluster_id=cluster_id,
1468 kdu_instance=kdu_instance,
1469 yaml_format=False,
1470 namespace=namespace,
1471 )
1472
1473 status = detailed_status.get("info").get("description")
1474 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1475
1476 # write status to db
1477 result = await self.write_app_status_to_db(
1478 db_dict=db_dict,
1479 status=str(status),
1480 detailed_status=str(detailed_status),
1481 operation=operation,
1482 )
1483
1484 if not result:
1485 self.log.info("Error writing in database. Task exiting...")
1486
1487 except asyncio.CancelledError as e:
1488 self.log.warning(
1489 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1490 )
1491 except Exception as e:
1492 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001493
1494 # params for use in -f file
1495 # returns values file option and filename (in order to delete it at the end)
1496 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1497
1498 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001499 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001500
1501 def get_random_number():
1502 r = random.randrange(start=1, stop=99999999)
1503 s = str(r)
1504 while len(s) < 10:
1505 s = "0" + s
1506 return s
1507
1508 params2 = dict()
1509 for key in params:
1510 value = params.get(key)
1511 if "!!yaml" in str(value):
1512 value = yaml.load(value[7:])
1513 params2[key] = value
1514
1515 values_file = get_random_number() + ".yaml"
1516 with open(values_file, "w") as stream:
1517 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1518
1519 return "-f {}".format(values_file), values_file
1520
1521 return "", None
1522
1523 # params for use in --set option
1524 @staticmethod
1525 def _params_to_set_option(params: dict) -> str:
1526 params_str = ""
1527 if params and len(params) > 0:
1528 start = True
1529 for key in params:
1530 value = params.get(key, None)
1531 if value is not None:
1532 if start:
1533 params_str += "--set "
1534 start = False
1535 else:
1536 params_str += ","
1537 params_str += "{}={}".format(key, value)
1538 return params_str
1539
1540 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001541 def generate_kdu_instance_name(**kwargs):
1542 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001543 # check embeded chart (file or dir)
1544 if chart_name.startswith("/"):
1545 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001546 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001547 # check URL
1548 elif "://" in chart_name:
1549 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001550 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001551
1552 name = ""
1553 for c in chart_name:
1554 if c.isalpha() or c.isnumeric():
1555 name += c
1556 else:
1557 name += "-"
1558 if len(name) > 35:
1559 name = name[0:35]
1560
1561 # if does not start with alpha character, prefix 'a'
1562 if not name[0].isalpha():
1563 name = "a" + name
1564
1565 name += "-"
1566
1567 def get_random_number():
1568 r = random.randrange(start=1, stop=99999999)
1569 s = str(r)
1570 s = s.rjust(10, "0")
1571 return s
1572
1573 name = name + get_random_number()
1574 return name.lower()
garciadeblas468002b2022-04-08 22:53:25 +02001575
1576 def _split_version(self, kdu_model: str) -> (str, str):
1577 version = None
1578 if ":" in kdu_model:
1579 parts = kdu_model.split(sep=":")
1580 if len(parts) == 2:
1581 version = str(parts[1])
1582 kdu_model = parts[0]
1583 return kdu_model, version
1584
1585 async def _split_repo(self, kdu_model: str) -> str:
1586 repo_name = None
1587 idx = kdu_model.find("/")
1588 if idx >= 0:
1589 repo_name = kdu_model[:idx]
1590 return repo_name
1591
1592 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1593 repo_url = None
1594 idx = kdu_model.find("/")
1595 if idx >= 0:
1596 repo_name = kdu_model[:idx]
1597 # Find repository link
1598 local_repo_list = await self.repo_list(cluster_uuid)
1599 for repo in local_repo_list:
1600 repo_url = repo["url"] if repo["name"] == repo_name else None
1601 return repo_url