blob: ccf9e33b3322fc91e7437d61fd2858d95603c664 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
24import random
25import time
26import shlex
27import shutil
28import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000029import os
30import yaml
31from uuid import uuid4
32
David Garcia4395cfa2021-05-28 16:21:51 +020033from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000034from n2vc.exceptions import K8sException
35from n2vc.k8s_conn import K8sConnector
36
37
38class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
garciadeblas82b591c2021-03-24 09:22:13 +010045
lloretgalleg1c83f2e2020-10-22 09:12:35 +000046 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
David Garcia4395cfa2021-05-28 16:21:51 +020072 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000073 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
lloretgalleg83e55892020-12-17 12:42:11 +000087 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020088 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000091
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010092 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000093 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010094 Obtains the namespace used by the cluster with the uuid passed by argument
95
96 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +000097 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +010098
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster = self.db.get_one(
101 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
102 )
103 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000104
105 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100106 self,
107 k8s_creds: str,
108 namespace: str = "kube-system",
109 reuse_cluster_uuid=None,
110 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts
114
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
116 '.kube/config'
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200120 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125
126 if reuse_cluster_uuid:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100127 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000128 else:
129 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000130
garciadeblas82b591c2021-03-24 09:22:13 +0100131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100151 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000152
153 async def repo_add(
garciadeblas82b591c2021-03-24 09:22:13 +0100154 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000155 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100158 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100159 )
160 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000161
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000162 # init_env
163 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100164 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000165 )
166
bravof53dd7462021-11-17 11:14:57 -0300167 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100168 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300169
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000170 # helm repo update
bravof53dd7462021-11-17 11:14:57 -0300171 command = "env KUBECONFIG={} {} repo update".format(
172 paths["kube_config"], self._helm_command
173 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000174 self.log.debug("updating repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100175 await self._local_async_exec(
176 command=command, raise_exception_on_error=False, env=env
177 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000178
179 # helm repo add name url
bravof53dd7462021-11-17 11:14:57 -0300180 command = "env KUBECONFIG={} {} repo add {} {}".format(
181 paths["kube_config"], self._helm_command, name, url
182 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000183 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100184 await self._local_async_exec(
185 command=command, raise_exception_on_error=True, env=env
186 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000187
188 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100189 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000190
191 async def repo_list(self, cluster_uuid: str) -> list:
192 """
193 Get the list of registered repositories
194
195 :return: list of registered repositories: [ (name, url) .... ]
196 """
197
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100198 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000199
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000200 # config filename
201 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100202 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000203 )
204
bravof53dd7462021-11-17 11:14:57 -0300205 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100206 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300207
208 command = "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths["kube_config"], self._helm_command
210 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000211
212 # Set exception to false because if there are no repos just want an empty list
213 output, _rc = await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100218 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000219
220 if _rc == 0:
221 if output and len(output) > 0:
222 repos = yaml.load(output, Loader=yaml.SafeLoader)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self._lower_keys_list(repos)
225 else:
226 return []
227 else:
228 return []
229
230 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100231 self.log.debug(
232 "remove {} repositories for cluster {}".format(name, cluster_uuid)
233 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000234
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000235 # init env, paths
236 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100237 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000238 )
239
bravof53dd7462021-11-17 11:14:57 -0300240 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100241 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300242
243 command = "env KUBECONFIG={} {} repo remove {}".format(
244 paths["kube_config"], self._helm_command, name
245 )
garciadeblas82b591c2021-03-24 09:22:13 +0100246 await self._local_async_exec(
247 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000248 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000249
250 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100251 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252
253 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100254 self,
255 cluster_uuid: str,
256 force: bool = False,
257 uninstall_sw: bool = False,
258 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000259 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200260 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000261
David Garciaeb8943a2021-04-12 12:07:37 +0200262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
263
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
269 """
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100270 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100271 self.log.debug(
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100273 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100274 )
275 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000276
277 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100278 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000279
280 # uninstall releases if needed.
281 if uninstall_sw:
282 releases = await self.instances_list(cluster_uuid=cluster_uuid)
283 if len(releases) > 0:
284 if force:
285 for r in releases:
286 try:
287 kdu_instance = r.get("name")
288 chart = r.get("chart")
289 self.log.debug(
290 "Uninstalling {} -> {}".format(chart, kdu_instance)
291 )
292 await self.uninstall(
293 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
294 )
295 except Exception as e:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
298 # raised an error
299 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100300 "Error uninstalling release {}: {}".format(
301 kdu_instance, e
302 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000303 )
304 else:
305 msg = (
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100307 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000308 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100309 uninstall_sw = (
310 False # Allow to remove k8s cluster without removing Tiller
311 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000312
313 if uninstall_sw:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100314 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000315
316 # delete cluster directory
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100317 self.log.debug("Removing directory {}".format(cluster_uuid))
318 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000319 # Remove also local directorio if still exist
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100320 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000321 shutil.rmtree(direct, ignore_errors=True)
322
323 return True
324
lloretgalleg095392b2020-11-20 11:28:08 +0000325 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100326 self,
327 cluster_id: str,
328 kdu_model: str,
329 paths: dict,
330 env: dict,
331 kdu_instance: str,
332 atomic: bool = True,
333 timeout: float = 300,
334 params: dict = None,
335 db_dict: dict = None,
336 kdu_name: str = None,
337 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000338 ):
bravof53dd7462021-11-17 11:14:57 -0300339 # init env, paths
340 paths, env = self._init_paths_env(
341 cluster_name=cluster_id, create_if_not_exist=True
342 )
343
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000344 # params to str
345 params_str, file_to_delete = self._params_to_file_option(
346 cluster_id=cluster_id, params=params
347 )
348
349 # version
350 version = None
351 if ":" in kdu_model:
352 parts = kdu_model.split(sep=":")
353 if len(parts) == 2:
354 version = str(parts[1])
355 kdu_model = parts[0]
356
garciadeblas82b591c2021-03-24 09:22:13 +0100357 command = self._get_install_command(
bravof53dd7462021-11-17 11:14:57 -0300358 kdu_model,
359 kdu_instance,
360 namespace,
361 params_str,
362 version,
363 atomic,
364 timeout,
365 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100366 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367
368 self.log.debug("installing: {}".format(command))
369
370 if atomic:
371 # exec helm in a task
372 exec_task = asyncio.ensure_future(
373 coro_or_future=self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376 )
377
378 # write status in another task
379 status_task = asyncio.ensure_future(
380 coro_or_future=self._store_status(
381 cluster_id=cluster_id,
382 kdu_instance=kdu_instance,
383 namespace=namespace,
384 db_dict=db_dict,
385 operation="install",
386 run_once=False,
387 )
388 )
389
390 # wait for execution task
391 await asyncio.wait([exec_task])
392
393 # cancel status task
394 status_task.cancel()
395
396 output, rc = exec_task.result()
397
398 else:
399
400 output, rc = await self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403
404 # remove temporal values yaml file
405 if file_to_delete:
406 os.remove(file_to_delete)
407
408 # write final status
409 await self._store_status(
410 cluster_id=cluster_id,
411 kdu_instance=kdu_instance,
412 namespace=namespace,
413 db_dict=db_dict,
414 operation="install",
415 run_once=True,
416 check_every=0,
417 )
418
419 if rc != 0:
420 msg = "Error executing command: {}\nOutput: {}".format(command, output)
421 self.log.error(msg)
422 raise K8sException(msg)
423
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000424 async def upgrade(
425 self,
426 cluster_uuid: str,
427 kdu_instance: str,
428 kdu_model: str = None,
429 atomic: bool = True,
430 timeout: float = 300,
431 params: dict = None,
432 db_dict: dict = None,
433 ):
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100434 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000435
436 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100437 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000438
439 # look for instance to obtain namespace
440 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
441 if not instance_info:
442 raise K8sException("kdu_instance {} not found".format(kdu_instance))
443
444 # init env, paths
445 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100446 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000447 )
448
bravof53dd7462021-11-17 11:14:57 -0300449 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100450 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300451
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000452 # params to str
453 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100454 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000455 )
456
457 # version
458 version = None
459 if ":" in kdu_model:
460 parts = kdu_model.split(sep=":")
461 if len(parts) == 2:
462 version = str(parts[1])
463 kdu_model = parts[0]
464
garciadeblas82b591c2021-03-24 09:22:13 +0100465 command = self._get_upgrade_command(
466 kdu_model,
467 kdu_instance,
468 instance_info["namespace"],
469 params_str,
470 version,
471 atomic,
472 timeout,
bravof53dd7462021-11-17 11:14:57 -0300473 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100474 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475
476 self.log.debug("upgrading: {}".format(command))
477
478 if atomic:
479
480 # exec helm in a task
481 exec_task = asyncio.ensure_future(
482 coro_or_future=self._local_async_exec(
483 command=command, raise_exception_on_error=False, env=env
484 )
485 )
486 # write status in another task
487 status_task = asyncio.ensure_future(
488 coro_or_future=self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100489 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000490 kdu_instance=kdu_instance,
491 namespace=instance_info["namespace"],
492 db_dict=db_dict,
493 operation="upgrade",
494 run_once=False,
495 )
496 )
497
498 # wait for execution task
499 await asyncio.wait([exec_task])
500
501 # cancel status task
502 status_task.cancel()
503 output, rc = exec_task.result()
504
505 else:
506
507 output, rc = await self._local_async_exec(
508 command=command, raise_exception_on_error=False, env=env
509 )
510
511 # remove temporal values yaml file
512 if file_to_delete:
513 os.remove(file_to_delete)
514
515 # write final status
516 await self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100517 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000518 kdu_instance=kdu_instance,
519 namespace=instance_info["namespace"],
520 db_dict=db_dict,
521 operation="upgrade",
522 run_once=True,
523 check_every=0,
524 )
525
526 if rc != 0:
527 msg = "Error executing command: {}\nOutput: {}".format(command, output)
528 self.log.error(msg)
529 raise K8sException(msg)
530
531 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100532 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000533
534 # return new revision number
535 instance = await self.get_instance_info(
536 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
537 )
538 if instance:
539 revision = int(instance.get("revision"))
540 self.log.debug("New revision: {}".format(revision))
541 return revision
542 else:
543 return 0
544
aktas2962f3e2021-03-15 11:05:35 +0300545 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100546 self,
547 kdu_instance: str,
548 scale: int,
549 resource_name: str,
550 total_timeout: float = 1800,
551 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300552 ):
553 raise NotImplementedError("Method not implemented")
554
555 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100556 self,
557 resource_name: str,
558 kdu_instance: str,
559 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300560 ):
561 raise NotImplementedError("Method not implemented")
562
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000563 async def rollback(
564 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
565 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000566 self.log.debug(
567 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100568 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000569 )
570 )
571
572 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100573 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000574
575 # look for instance to obtain namespace
576 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
577 if not instance_info:
578 raise K8sException("kdu_instance {} not found".format(kdu_instance))
579
580 # init env, paths
581 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100582 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000583 )
584
bravof53dd7462021-11-17 11:14:57 -0300585 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100586 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300587
garciadeblas82b591c2021-03-24 09:22:13 +0100588 command = self._get_rollback_command(
bravof53dd7462021-11-17 11:14:57 -0300589 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100590 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000591
592 self.log.debug("rolling_back: {}".format(command))
593
594 # exec helm in a task
595 exec_task = asyncio.ensure_future(
596 coro_or_future=self._local_async_exec(
597 command=command, raise_exception_on_error=False, env=env
598 )
599 )
600 # write status in another task
601 status_task = asyncio.ensure_future(
602 coro_or_future=self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100603 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000604 kdu_instance=kdu_instance,
605 namespace=instance_info["namespace"],
606 db_dict=db_dict,
607 operation="rollback",
608 run_once=False,
609 )
610 )
611
612 # wait for execution task
613 await asyncio.wait([exec_task])
614
615 # cancel status task
616 status_task.cancel()
617
618 output, rc = exec_task.result()
619
620 # write final status
621 await self._store_status(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100622 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000623 kdu_instance=kdu_instance,
624 namespace=instance_info["namespace"],
625 db_dict=db_dict,
626 operation="rollback",
627 run_once=True,
628 check_every=0,
629 )
630
631 if rc != 0:
632 msg = "Error executing command: {}\nOutput: {}".format(command, output)
633 self.log.error(msg)
634 raise K8sException(msg)
635
636 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100637 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000638
639 # return new revision number
640 instance = await self.get_instance_info(
641 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
642 )
643 if instance:
644 revision = int(instance.get("revision"))
645 self.log.debug("New revision: {}".format(revision))
646 return revision
647 else:
648 return 0
649
David Garciaeb8943a2021-04-12 12:07:37 +0200650 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000651 """
652 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
653 (this call should happen after all _terminate-config-primitive_ of the VNF
654 are invoked).
655
656 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
657 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200658 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000659 :return: True if successful
660 """
661
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000662 self.log.debug(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100663 "uninstall kdu_instance {} from cluster {}".format(
664 kdu_instance, cluster_uuid
665 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000666 )
667
668 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100669 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000670
671 # look for instance to obtain namespace
672 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
673 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200674 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
675 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000676 # init env, paths
677 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100678 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000679 )
680
bravof53dd7462021-11-17 11:14:57 -0300681 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100682 self.fs.sync(from_path=cluster_uuid)
bravof53dd7462021-11-17 11:14:57 -0300683
684 command = self._get_uninstall_command(
685 kdu_instance, instance_info["namespace"], paths["kube_config"]
686 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000687 output, _rc = await self._local_async_exec(
688 command=command, raise_exception_on_error=True, env=env
689 )
690
691 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100692 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000693
694 return self._output_to_table(output)
695
696 async def instances_list(self, cluster_uuid: str) -> list:
697 """
698 returns a list of deployed releases in a cluster
699
700 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
701 :return:
702 """
703
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100704 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000705
706 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100707 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000708
709 # execute internal command
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100710 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000711
712 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100713 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000714
715 return result
716
717 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
718 instances = await self.instances_list(cluster_uuid=cluster_uuid)
719 for instance in instances:
720 if instance.get("name") == kdu_instance:
721 return instance
722 self.log.debug("Instance {} not found".format(kdu_instance))
723 return None
724
725 async def exec_primitive(
726 self,
727 cluster_uuid: str = None,
728 kdu_instance: str = None,
729 primitive_name: str = None,
730 timeout: float = 300,
731 params: dict = None,
732 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200733 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000734 ) -> str:
735 """Exec primitive (Juju action)
736
737 :param cluster_uuid: The UUID of the cluster or namespace:cluster
738 :param kdu_instance: The unique name of the KDU instance
739 :param primitive_name: Name of action that will be executed
740 :param timeout: Timeout for action execution
741 :param params: Dictionary of all the parameters needed for the action
742 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200743 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000744
745 :return: Returns the output of the action
746 """
747 raise K8sException(
748 "KDUs deployed with Helm don't support actions "
749 "different from rollback, upgrade and status"
750 )
751
garciadeblas82b591c2021-03-24 09:22:13 +0100752 async def get_services(
753 self, cluster_uuid: str, kdu_instance: str, namespace: str
754 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000755 """
756 Returns a list of services defined for the specified kdu instance.
757
758 :param cluster_uuid: UUID of a K8s cluster known by OSM
759 :param kdu_instance: unique name for the KDU instance
760 :param namespace: K8s namespace used by the KDU instance
761 :return: If successful, it will return a list of services, Each service
762 can have the following data:
763 - `name` of the service
764 - `type` type of service in the k8 cluster
765 - `ports` List of ports offered by the service, for each port includes at least
766 name, port, protocol
767 - `cluster_ip` Internal ip to be used inside k8s cluster
768 - `external_ip` List of external ips (in case they are available)
769 """
770
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000771 self.log.debug(
772 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
773 cluster_uuid, kdu_instance
774 )
775 )
776
bravof53dd7462021-11-17 11:14:57 -0300777 # init env, paths
778 paths, env = self._init_paths_env(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100779 cluster_name=cluster_uuid, create_if_not_exist=True
bravof53dd7462021-11-17 11:14:57 -0300780 )
781
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000782 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100783 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000784
785 # get list of services names for kdu
bravof53dd7462021-11-17 11:14:57 -0300786 service_names = await self._get_services(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100787 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof53dd7462021-11-17 11:14:57 -0300788 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000789
790 service_list = []
791 for service in service_names:
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100792 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000793 service_list.append(service)
794
795 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100796 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000797
798 return service_list
799
garciadeblas82b591c2021-03-24 09:22:13 +0100800 async def get_service(
801 self, cluster_uuid: str, service_name: str, namespace: str
802 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000803
804 self.log.debug(
805 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +0100806 service_name, namespace, cluster_uuid
807 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000808 )
809
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000810 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100811 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000812
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100813 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000814
815 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100816 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000817
818 return service
819
David Garciaeb8943a2021-04-12 12:07:37 +0200820 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
821 """
822 This call would retrieve tha current state of a given KDU instance. It would be
823 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
824 values_ of the configuration parameters applied to a given instance. This call
825 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000826
David Garciaeb8943a2021-04-12 12:07:37 +0200827 :param cluster_uuid: UUID of a K8s cluster known by OSM
828 :param kdu_instance: unique name for the KDU instance
829 :param kwargs: Additional parameters (None yet)
830 :return: If successful, it will return the following vector of arguments:
831 - K8s `namespace` in the cluster where the KDU lives
832 - `state` of the KDU instance. It can be:
833 - UNKNOWN
834 - DEPLOYED
835 - DELETED
836 - SUPERSEDED
837 - FAILED or
838 - DELETING
839 - List of `resources` (objects) that this release consists of, sorted by kind,
840 and the status of those resources
841 - Last `deployment_time`.
842
843 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000844 self.log.debug(
845 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
846 cluster_uuid, kdu_instance
847 )
848 )
849
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000850 # sync local dir
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100851 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000852
853 # get instance: needed to obtain namespace
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100854 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000855 for instance in instances:
856 if instance.get("name") == kdu_instance:
857 break
858 else:
859 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +0100860 raise K8sException(
861 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100862 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +0100863 )
864 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000865
866 status = await self._status_kdu(
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100867 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000868 kdu_instance=kdu_instance,
869 namespace=instance["namespace"],
870 show_error_log=True,
871 return_text=True,
872 )
873
874 # sync fs
Pedro Escaleiraeba0ac32022-04-02 00:44:08 +0100875 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000876
877 return status
878
879 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
880
881 self.log.debug(
882 "inspect kdu_model values {} from (optional) repo: {}".format(
883 kdu_model, repo_url
884 )
885 )
886
887 return await self._exec_inspect_comand(
888 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
889 )
890
891 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
892
893 self.log.debug(
894 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
895 )
896
897 return await self._exec_inspect_comand(
898 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
899 )
900
901 async def synchronize_repos(self, cluster_uuid: str):
902
903 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
904 try:
905 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
906 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
907
908 local_repo_list = await self.repo_list(cluster_uuid)
909 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
910
911 deleted_repo_list = []
912 added_repo_dict = {}
913
914 # iterate over the list of repos in the database that should be
915 # added if not present
916 for repo_name, db_repo in db_repo_dict.items():
917 try:
918 # check if it is already present
919 curr_repo_url = local_repo_dict.get(db_repo["name"])
920 repo_id = db_repo.get("_id")
921 if curr_repo_url != db_repo["url"]:
922 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +0100923 self.log.debug(
924 "repo {} url changed, delete and and again".format(
925 db_repo["url"]
926 )
927 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000928 await self.repo_remove(cluster_uuid, db_repo["name"])
929 deleted_repo_list.append(repo_id)
930
931 # add repo
932 self.log.debug("add repo {}".format(db_repo["name"]))
garciadeblas82b591c2021-03-24 09:22:13 +0100933 await self.repo_add(
934 cluster_uuid, db_repo["name"], db_repo["url"]
935 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000936 added_repo_dict[repo_id] = db_repo["name"]
937 except Exception as e:
938 raise K8sException(
939 "Error adding repo id: {}, err_msg: {} ".format(
940 repo_id, repr(e)
941 )
942 )
943
944 # Delete repos that are present but not in nbi_list
945 for repo_name in local_repo_dict:
946 if not db_repo_dict.get(repo_name) and repo_name != "stable":
947 self.log.debug("delete repo {}".format(repo_name))
948 try:
949 await self.repo_remove(cluster_uuid, repo_name)
950 deleted_repo_list.append(repo_name)
951 except Exception as e:
952 self.warning(
953 "Error deleting repo, name: {}, err_msg: {}".format(
954 repo_name, str(e)
955 )
956 )
957
958 return deleted_repo_list, added_repo_dict
959
960 except K8sException:
961 raise
962 except Exception as e:
963 # Do not raise errors synchronizing repos
964 self.log.error("Error synchronizing repos: {}".format(e))
965 raise Exception("Error synchronizing repos: {}".format(e))
966
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000967 def _get_db_repos_dict(self, repo_ids: list):
968 db_repos_dict = {}
969 for repo_id in repo_ids:
970 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
971 db_repos_dict[db_repo["name"]] = db_repo
972 return db_repos_dict
973
974 """
975 ####################################################################################
976 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
977 ####################################################################################
978 """
979
980 @abc.abstractmethod
981 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
982 """
983 Creates and returns base cluster and kube dirs and returns them.
984 Also created helm3 dirs according to new directory specification, paths are
985 not returned but assigned to helm environment variables
986
987 :param cluster_name: cluster_name
988 :return: Dictionary with config_paths and dictionary with helm environment variables
989 """
990
991 @abc.abstractmethod
992 async def _cluster_init(self, cluster_id, namespace, paths, env):
993 """
994 Implements the helm version dependent cluster initialization
995 """
996
997 @abc.abstractmethod
998 async def _instances_list(self, cluster_id):
999 """
1000 Implements the helm version dependent helm instances list
1001 """
1002
1003 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001004 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001005 """
1006 Implements the helm version dependent method to obtain services from a helm instance
1007 """
1008
1009 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001010 async def _status_kdu(
1011 self,
1012 cluster_id: str,
1013 kdu_instance: str,
1014 namespace: str = None,
1015 show_error_log: bool = False,
1016 return_text: bool = False,
1017 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001018 """
1019 Implements the helm version dependent method to obtain status of a helm instance
1020 """
1021
1022 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001023 def _get_install_command(
bravof53dd7462021-11-17 11:14:57 -03001024 self,
1025 kdu_model,
1026 kdu_instance,
1027 namespace,
1028 params_str,
1029 version,
1030 atomic,
1031 timeout,
1032 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001033 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001034 """
1035 Obtain command to be executed to delete the indicated instance
1036 """
1037
1038 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001039 def _get_upgrade_command(
bravof53dd7462021-11-17 11:14:57 -03001040 self,
1041 kdu_model,
1042 kdu_instance,
1043 namespace,
1044 params_str,
1045 version,
1046 atomic,
1047 timeout,
1048 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001049 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001050 """
1051 Obtain command to be executed to upgrade the indicated instance
1052 """
1053
1054 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001055 def _get_rollback_command(
1056 self, kdu_instance, namespace, revision, kubeconfig
1057 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001058 """
1059 Obtain command to be executed to rollback the indicated instance
1060 """
1061
1062 @abc.abstractmethod
bravof53dd7462021-11-17 11:14:57 -03001063 def _get_uninstall_command(
1064 self, kdu_instance: str, namespace: str, kubeconfig: str
1065 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001066 """
1067 Obtain command to be executed to delete the indicated instance
1068 """
1069
1070 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001071 def _get_inspect_command(
1072 self, show_command: str, kdu_model: str, repo_str: str, version: str
1073 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001074 """
1075 Obtain command to be executed to obtain information about the kdu
1076 """
1077
1078 @abc.abstractmethod
1079 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1080 """
1081 Method call to uninstall cluster software for helm. This method is dependent
1082 of helm version
1083 For Helm v2 it will be called when Tiller must be uninstalled
1084 For Helm v3 it does nothing and does not need to be callled
1085 """
1086
lloretgalleg095392b2020-11-20 11:28:08 +00001087 @abc.abstractmethod
1088 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1089 """
1090 Obtains the cluster repos identifiers
1091 """
1092
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001093 """
1094 ####################################################################################
1095 ################################### P R I V A T E ##################################
1096 ####################################################################################
1097 """
1098
1099 @staticmethod
1100 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1101 if os.path.exists(filename):
1102 return True
1103 else:
1104 msg = "File {} does not exist".format(filename)
1105 if exception_if_not_exists:
1106 raise K8sException(msg)
1107
1108 @staticmethod
1109 def _remove_multiple_spaces(strobj):
1110 strobj = strobj.strip()
1111 while " " in strobj:
1112 strobj = strobj.replace(" ", " ")
1113 return strobj
1114
1115 @staticmethod
1116 def _output_to_lines(output: str) -> list:
1117 output_lines = list()
1118 lines = output.splitlines(keepends=False)
1119 for line in lines:
1120 line = line.strip()
1121 if len(line) > 0:
1122 output_lines.append(line)
1123 return output_lines
1124
1125 @staticmethod
1126 def _output_to_table(output: str) -> list:
1127 output_table = list()
1128 lines = output.splitlines(keepends=False)
1129 for line in lines:
1130 line = line.replace("\t", " ")
1131 line_list = list()
1132 output_table.append(line_list)
1133 cells = line.split(sep=" ")
1134 for cell in cells:
1135 cell = cell.strip()
1136 if len(cell) > 0:
1137 line_list.append(cell)
1138 return output_table
1139
1140 @staticmethod
1141 def _parse_services(output: str) -> list:
1142 lines = output.splitlines(keepends=False)
1143 services = []
1144 for line in lines:
1145 line = line.replace("\t", " ")
1146 cells = line.split(sep=" ")
1147 if len(cells) > 0 and cells[0].startswith("service/"):
1148 elems = cells[0].split(sep="/")
1149 if len(elems) > 1:
1150 services.append(elems[1])
1151 return services
1152
1153 @staticmethod
1154 def _get_deep(dictionary: dict, members: tuple):
1155 target = dictionary
1156 value = None
1157 try:
1158 for m in members:
1159 value = target.get(m)
1160 if not value:
1161 return None
1162 else:
1163 target = value
1164 except Exception:
1165 pass
1166 return value
1167
1168 # find key:value in several lines
1169 @staticmethod
1170 def _find_in_lines(p_lines: list, p_key: str) -> str:
1171 for line in p_lines:
1172 try:
1173 if line.startswith(p_key + ":"):
1174 parts = line.split(":")
1175 the_value = parts[1].strip()
1176 return the_value
1177 except Exception:
1178 # ignore it
1179 pass
1180 return None
1181
1182 @staticmethod
1183 def _lower_keys_list(input_list: list):
1184 """
1185 Transform the keys in a list of dictionaries to lower case and returns a new list
1186 of dictionaries
1187 """
1188 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001189 if input_list:
1190 for dictionary in input_list:
1191 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1192 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001193 return new_list
1194
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001195 async def _local_async_exec(
1196 self,
1197 command: str,
1198 raise_exception_on_error: bool = False,
1199 show_error_log: bool = True,
1200 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001201 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001202 ) -> (str, int):
1203
1204 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001205 self.log.debug(
1206 "Executing async local command: {}, env: {}".format(command, env)
1207 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001208
1209 # split command
1210 command = shlex.split(command)
1211
1212 environ = os.environ.copy()
1213 if env:
1214 environ.update(env)
1215
1216 try:
1217 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001218 *command,
1219 stdout=asyncio.subprocess.PIPE,
1220 stderr=asyncio.subprocess.PIPE,
1221 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001222 )
1223
1224 # wait for command terminate
1225 stdout, stderr = await process.communicate()
1226
1227 return_code = process.returncode
1228
1229 output = ""
1230 if stdout:
1231 output = stdout.decode("utf-8").strip()
1232 # output = stdout.decode()
1233 if stderr:
1234 output = stderr.decode("utf-8").strip()
1235 # output = stderr.decode()
1236
1237 if return_code != 0 and show_error_log:
1238 self.log.debug(
1239 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1240 )
1241 else:
1242 self.log.debug("Return code: {}".format(return_code))
1243
1244 if raise_exception_on_error and return_code != 0:
1245 raise K8sException(output)
1246
1247 if encode_utf8:
1248 output = output.encode("utf-8").strip()
1249 output = str(output).replace("\\n", "\n")
1250
1251 return output, return_code
1252
1253 except asyncio.CancelledError:
1254 raise
1255 except K8sException:
1256 raise
1257 except Exception as e:
1258 msg = "Exception executing command: {} -> {}".format(command, e)
1259 self.log.error(msg)
1260 if raise_exception_on_error:
1261 raise K8sException(e) from e
1262 else:
1263 return "", -1
1264
garciadeblas82b591c2021-03-24 09:22:13 +01001265 async def _local_async_exec_pipe(
1266 self,
1267 command1: str,
1268 command2: str,
1269 raise_exception_on_error: bool = True,
1270 show_error_log: bool = True,
1271 encode_utf8: bool = False,
1272 env: dict = None,
1273 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001274
1275 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1276 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1277 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001278 self.log.debug(
1279 "Executing async local command: {}, env: {}".format(command, env)
1280 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001281
1282 # split command
1283 command1 = shlex.split(command1)
1284 command2 = shlex.split(command2)
1285
1286 environ = os.environ.copy()
1287 if env:
1288 environ.update(env)
1289
1290 try:
1291 read, write = os.pipe()
1292 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1293 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001294 process_2 = await asyncio.create_subprocess_exec(
1295 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1296 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001297 os.close(read)
1298 stdout, stderr = await process_2.communicate()
1299
1300 return_code = process_2.returncode
1301
1302 output = ""
1303 if stdout:
1304 output = stdout.decode("utf-8").strip()
1305 # output = stdout.decode()
1306 if stderr:
1307 output = stderr.decode("utf-8").strip()
1308 # output = stderr.decode()
1309
1310 if return_code != 0 and show_error_log:
1311 self.log.debug(
1312 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1313 )
1314 else:
1315 self.log.debug("Return code: {}".format(return_code))
1316
1317 if raise_exception_on_error and return_code != 0:
1318 raise K8sException(output)
1319
1320 if encode_utf8:
1321 output = output.encode("utf-8").strip()
1322 output = str(output).replace("\\n", "\n")
1323
1324 return output, return_code
1325 except asyncio.CancelledError:
1326 raise
1327 except K8sException:
1328 raise
1329 except Exception as e:
1330 msg = "Exception executing command: {} -> {}".format(command, e)
1331 self.log.error(msg)
1332 if raise_exception_on_error:
1333 raise K8sException(e) from e
1334 else:
1335 return "", -1
1336
1337 async def _get_service(self, cluster_id, service_name, namespace):
1338 """
1339 Obtains the data of the specified service in the k8cluster.
1340
1341 :param cluster_id: id of a K8s cluster known by OSM
1342 :param service_name: name of the K8s service in the specified namespace
1343 :param namespace: K8s namespace used by the KDU instance
1344 :return: If successful, it will return a service with the following data:
1345 - `name` of the service
1346 - `type` type of service in the k8 cluster
1347 - `ports` List of ports offered by the service, for each port includes at least
1348 name, port, protocol
1349 - `cluster_ip` Internal ip to be used inside k8s cluster
1350 - `external_ip` List of external ips (in case they are available)
1351 """
1352
1353 # init config, env
1354 paths, env = self._init_paths_env(
1355 cluster_name=cluster_id, create_if_not_exist=True
1356 )
1357
1358 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1359 self.kubectl_command, paths["kube_config"], namespace, service_name
1360 )
1361
1362 output, _rc = await self._local_async_exec(
1363 command=command, raise_exception_on_error=True, env=env
1364 )
1365
1366 data = yaml.load(output, Loader=yaml.SafeLoader)
1367
1368 service = {
1369 "name": service_name,
1370 "type": self._get_deep(data, ("spec", "type")),
1371 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001372 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001373 }
1374 if service["type"] == "LoadBalancer":
1375 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1376 ip_list = [elem["ip"] for elem in ip_map_list]
1377 service["external_ip"] = ip_list
1378
1379 return service
1380
1381 async def _exec_inspect_comand(
1382 self, inspect_command: str, kdu_model: str, repo_url: str = None
1383 ):
1384 """
1385 Obtains information about a kdu, no cluster (no env)
1386 """
1387
1388 repo_str = ""
1389 if repo_url:
1390 repo_str = " --repo {}".format(repo_url)
1391
1392 idx = kdu_model.find("/")
1393 if idx >= 0:
1394 idx += 1
1395 kdu_model = kdu_model[idx:]
1396
1397 version = ""
1398 if ":" in kdu_model:
1399 parts = kdu_model.split(sep=":")
1400 if len(parts) == 2:
1401 version = "--version {}".format(str(parts[1]))
1402 kdu_model = parts[0]
1403
garciadeblas82b591c2021-03-24 09:22:13 +01001404 full_command = self._get_inspect_command(
1405 inspect_command, kdu_model, repo_str, version
1406 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001407 output, _rc = await self._local_async_exec(
1408 command=full_command, encode_utf8=True
1409 )
1410
1411 return output
1412
1413 async def _store_status(
1414 self,
1415 cluster_id: str,
1416 operation: str,
1417 kdu_instance: str,
1418 namespace: str = None,
1419 check_every: float = 10,
1420 db_dict: dict = None,
1421 run_once: bool = False,
1422 ):
1423 while True:
1424 try:
1425 await asyncio.sleep(check_every)
1426 detailed_status = await self._status_kdu(
garciadeblas82b591c2021-03-24 09:22:13 +01001427 cluster_id=cluster_id,
1428 kdu_instance=kdu_instance,
1429 namespace=namespace,
1430 return_text=False,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001431 )
1432 status = detailed_status.get("info").get("description")
garciadeblas82b591c2021-03-24 09:22:13 +01001433 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001434 # write status to db
1435 result = await self.write_app_status_to_db(
1436 db_dict=db_dict,
1437 status=str(status),
1438 detailed_status=str(detailed_status),
1439 operation=operation,
1440 )
1441 if not result:
1442 self.log.info("Error writing in database. Task exiting...")
1443 return
1444 except asyncio.CancelledError:
1445 self.log.debug("Task cancelled")
1446 return
1447 except Exception as e:
garciadeblas82b591c2021-03-24 09:22:13 +01001448 self.log.debug(
1449 "_store_status exception: {}".format(str(e)), exc_info=True
1450 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001451 pass
1452 finally:
1453 if run_once:
1454 return
1455
1456 # params for use in -f file
1457 # returns values file option and filename (in order to delete it at the end)
1458 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1459
1460 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001461 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001462
1463 def get_random_number():
1464 r = random.randrange(start=1, stop=99999999)
1465 s = str(r)
1466 while len(s) < 10:
1467 s = "0" + s
1468 return s
1469
1470 params2 = dict()
1471 for key in params:
1472 value = params.get(key)
1473 if "!!yaml" in str(value):
1474 value = yaml.load(value[7:])
1475 params2[key] = value
1476
1477 values_file = get_random_number() + ".yaml"
1478 with open(values_file, "w") as stream:
1479 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1480
1481 return "-f {}".format(values_file), values_file
1482
1483 return "", None
1484
1485 # params for use in --set option
1486 @staticmethod
1487 def _params_to_set_option(params: dict) -> str:
1488 params_str = ""
1489 if params and len(params) > 0:
1490 start = True
1491 for key in params:
1492 value = params.get(key, None)
1493 if value is not None:
1494 if start:
1495 params_str += "--set "
1496 start = False
1497 else:
1498 params_str += ","
1499 params_str += "{}={}".format(key, value)
1500 return params_str
1501
1502 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001503 def generate_kdu_instance_name(**kwargs):
1504 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001505 # check embeded chart (file or dir)
1506 if chart_name.startswith("/"):
1507 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001508 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001509 # check URL
1510 elif "://" in chart_name:
1511 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001512 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001513
1514 name = ""
1515 for c in chart_name:
1516 if c.isalpha() or c.isnumeric():
1517 name += c
1518 else:
1519 name += "-"
1520 if len(name) > 35:
1521 name = name[0:35]
1522
1523 # if does not start with alpha character, prefix 'a'
1524 if not name[0].isalpha():
1525 name = "a" + name
1526
1527 name += "-"
1528
1529 def get_random_number():
1530 r = random.randrange(start=1, stop=99999999)
1531 s = str(r)
1532 s = s.rjust(10, "0")
1533 return s
1534
1535 name = name + get_random_number()
1536 return name.lower()