blob: dd2116c448d46116fc0146fab8d0fdcc34c6440f [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleirab41de172022-04-02 00:44:08 +010093 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000094 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010095 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010099
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000105
106 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200121 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100128 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000129 else:
130 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000131
garciadeblas82b591c2021-03-24 09:22:13 +0100132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
Pedro Escaleirab41de172022-04-02 00:44:08 +0100152 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000153
154 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000163 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100166 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100167 )
168 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000169
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000170 # init_env
171 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100172 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 )
174
bravof7bd5c6a2021-11-17 11:14:57 -0300175 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100176 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300177
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000178 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300180 paths["kube_config"], self._helm_command, name, url
181 )
bravof0ab522f2021-11-23 19:33:18 -0300182
183 if cert:
184 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000198 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000202
garciadeblasd4cee8c2022-05-04 10:57:36 +0200203 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000212 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100213 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000214
garciadeblas7faf4ec2022-04-08 22:53:25 +0200215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
Pedro Escaleirab41de172022-04-02 00:44:08 +0100247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000248
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000249 # config filename
250 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100251 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 )
253
bravof7bd5c6a2021-11-17 11:14:57 -0300254 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100255 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100267 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000283
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000284 # init env, paths
285 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100286 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 )
288
bravof7bd5c6a2021-11-17 11:14:57 -0300289 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100290 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
garciadeblas82b591c2021-03-24 09:22:13 +0100295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000297 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000298
299 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100300 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000308 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200309 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000310
David Garciaeb8943a2021-04-12 12:07:37 +0200311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100323 )
324 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000325
326 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100327 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100356 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000357 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000361
362 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000368 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
garciadeblas04393192022-06-08 15:39:24 +0200374 def _is_helm_chart_a_file(self, chart_name: str):
375 return chart_name.count("/") > 1
376
lloretgalleg095392b2020-11-20 11:28:08 +0000377 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100378 self,
379 cluster_id: str,
380 kdu_model: str,
381 paths: dict,
382 env: dict,
383 kdu_instance: str,
384 atomic: bool = True,
385 timeout: float = 300,
386 params: dict = None,
387 db_dict: dict = None,
388 kdu_name: str = None,
389 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000390 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300391 # init env, paths
392 paths, env = self._init_paths_env(
393 cluster_name=cluster_id, create_if_not_exist=True
394 )
395
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000396 # params to str
397 params_str, file_to_delete = self._params_to_file_option(
398 cluster_id=cluster_id, params=params
399 )
400
401 # version
aktas867418c2021-10-19 18:26:13 +0300402 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000403
garciadeblas7faf4ec2022-04-08 22:53:25 +0200404 repo = self._split_repo(kdu_model)
405 if repo:
406 self.repo_update(cluster_id, repo)
407
garciadeblas82b591c2021-03-24 09:22:13 +0100408 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300409 kdu_model,
410 kdu_instance,
411 namespace,
412 params_str,
413 version,
414 atomic,
415 timeout,
416 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100417 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000418
419 self.log.debug("installing: {}".format(command))
420
421 if atomic:
422 # exec helm in a task
423 exec_task = asyncio.ensure_future(
424 coro_or_future=self._local_async_exec(
425 command=command, raise_exception_on_error=False, env=env
426 )
427 )
428
429 # write status in another task
430 status_task = asyncio.ensure_future(
431 coro_or_future=self._store_status(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 namespace=namespace,
435 db_dict=db_dict,
436 operation="install",
437 run_once=False,
438 )
439 )
440
441 # wait for execution task
442 await asyncio.wait([exec_task])
443
444 # cancel status task
445 status_task.cancel()
446
447 output, rc = exec_task.result()
448
449 else:
450
451 output, rc = await self._local_async_exec(
452 command=command, raise_exception_on_error=False, env=env
453 )
454
455 # remove temporal values yaml file
456 if file_to_delete:
457 os.remove(file_to_delete)
458
459 # write final status
460 await self._store_status(
461 cluster_id=cluster_id,
462 kdu_instance=kdu_instance,
463 namespace=namespace,
464 db_dict=db_dict,
465 operation="install",
466 run_once=True,
467 check_every=0,
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100497 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000498 )
499
bravof7bd5c6a2021-11-17 11:14:57 -0300500 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100501 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300502
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100505 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506 )
507
508 # version
aktas867418c2021-10-19 18:26:13 +0300509 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510
garciadeblas7faf4ec2022-04-08 22:53:25 +0200511 repo = self._split_repo(kdu_model)
512 if repo:
513 self.repo_update(cluster_uuid, repo)
514
garciadeblas82b591c2021-03-24 09:22:13 +0100515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300523 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100524 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100539 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
544 run_once=False,
545 )
546 )
547
548 # wait for execution task
549 await asyncio.wait([exec_task])
550
551 # cancel status task
552 status_task.cancel()
553 output, rc = exec_task.result()
554
555 else:
556
557 output, rc = await self._local_async_exec(
558 command=command, raise_exception_on_error=False, env=env
559 )
560
561 # remove temporal values yaml file
562 if file_to_delete:
563 os.remove(file_to_delete)
564
565 # write final status
566 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100567 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000568 kdu_instance=kdu_instance,
569 namespace=instance_info["namespace"],
570 db_dict=db_dict,
571 operation="upgrade",
572 run_once=True,
573 check_every=0,
574 )
575
576 if rc != 0:
577 msg = "Error executing command: {}\nOutput: {}".format(command, output)
578 self.log.error(msg)
579 raise K8sException(msg)
580
581 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100582 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000583
584 # return new revision number
585 instance = await self.get_instance_info(
586 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
587 )
588 if instance:
589 revision = int(instance.get("revision"))
590 self.log.debug("New revision: {}".format(revision))
591 return revision
592 else:
593 return 0
594
aktas2962f3e2021-03-15 11:05:35 +0300595 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100596 self,
597 kdu_instance: str,
598 scale: int,
599 resource_name: str,
600 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300601 cluster_uuid: str = None,
602 kdu_model: str = None,
603 atomic: bool = True,
604 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100605 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300606 ):
aktas867418c2021-10-19 18:26:13 +0300607 """Scale a resource in a Helm Chart.
608
609 Args:
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
620
621 Returns:
622 True if successful, False otherwise
623 """
624
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300626 if resource_name:
627 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100628 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300629 )
630
631 self.log.debug(debug_mgs)
632
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
636 if not instance_info:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance))
638
639 # init env, paths
640 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100641 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300642 )
643
644 # version
645 kdu_model, version = self._split_version(kdu_model)
646
647 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300648
649 _, replica_str = await self._get_replica_count_url(
650 kdu_model, repo_url, resource_name
651 )
652
653 command = self._get_upgrade_scale_command(
654 kdu_model,
655 kdu_instance,
656 instance_info["namespace"],
657 scale,
658 version,
659 atomic,
660 replica_str,
661 total_timeout,
662 resource_name,
663 paths["kube_config"],
664 )
665
666 self.log.debug("scaling: {}".format(command))
667
668 if atomic:
669 # exec helm in a task
670 exec_task = asyncio.ensure_future(
671 coro_or_future=self._local_async_exec(
672 command=command, raise_exception_on_error=False, env=env
673 )
674 )
675 # write status in another task
676 status_task = asyncio.ensure_future(
677 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100678 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300679 kdu_instance=kdu_instance,
680 namespace=instance_info["namespace"],
681 db_dict=db_dict,
682 operation="scale",
683 run_once=False,
684 )
685 )
686
687 # wait for execution task
688 await asyncio.wait([exec_task])
689
690 # cancel status task
691 status_task.cancel()
692 output, rc = exec_task.result()
693
694 else:
695 output, rc = await self._local_async_exec(
696 command=command, raise_exception_on_error=False, env=env
697 )
698
699 # write final status
700 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100701 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300702 kdu_instance=kdu_instance,
703 namespace=instance_info["namespace"],
704 db_dict=db_dict,
705 operation="scale",
706 run_once=True,
707 check_every=0,
708 )
709
710 if rc != 0:
711 msg = "Error executing command: {}\nOutput: {}".format(command, output)
712 self.log.error(msg)
713 raise K8sException(msg)
714
715 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100716 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300717
718 return True
aktas2962f3e2021-03-15 11:05:35 +0300719
720 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100721 self,
722 resource_name: str,
723 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300724 cluster_uuid: str,
725 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100726 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300727 ) -> int:
728 """Get a resource scale count.
729
730 Args:
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100734 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300735 kwargs: Additional parameters
736
737 Returns:
738 Resource instance count
739 """
740
aktas867418c2021-10-19 18:26:13 +0300741 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100742 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300743 )
744
745 # look for instance to obtain namespace
746 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
747 if not instance_info:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance))
749
750 # init env, paths
751 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100752 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300753 )
754
755 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100756 kdu_instance=kdu_instance,
757 namespace=instance_info["namespace"],
758 kubeconfig=paths["kube_config"],
759 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300760 )
761
762 # Get default value if scale count is not found from provided values
763 if not replicas:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100764 repo_url = await self._find_repo(
765 kdu_model=kdu_model, cluster_uuid=cluster_uuid
766 )
aktas867418c2021-10-19 18:26:13 +0300767 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100768 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300769 )
770
771 if not replicas:
772 msg = "Replica count not found. Cannot be scaled"
773 self.log.error(msg)
774 raise K8sException(msg)
775
776 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300777
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000778 async def rollback(
779 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
780 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000781 self.log.debug(
782 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100783 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000784 )
785 )
786
787 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100788 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000789
790 # look for instance to obtain namespace
791 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
792 if not instance_info:
793 raise K8sException("kdu_instance {} not found".format(kdu_instance))
794
795 # init env, paths
796 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100797 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000798 )
799
bravof7bd5c6a2021-11-17 11:14:57 -0300800 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100801 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300802
garciadeblas82b591c2021-03-24 09:22:13 +0100803 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300804 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100805 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000806
807 self.log.debug("rolling_back: {}".format(command))
808
809 # exec helm in a task
810 exec_task = asyncio.ensure_future(
811 coro_or_future=self._local_async_exec(
812 command=command, raise_exception_on_error=False, env=env
813 )
814 )
815 # write status in another task
816 status_task = asyncio.ensure_future(
817 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100818 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000819 kdu_instance=kdu_instance,
820 namespace=instance_info["namespace"],
821 db_dict=db_dict,
822 operation="rollback",
823 run_once=False,
824 )
825 )
826
827 # wait for execution task
828 await asyncio.wait([exec_task])
829
830 # cancel status task
831 status_task.cancel()
832
833 output, rc = exec_task.result()
834
835 # write final status
836 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100837 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000838 kdu_instance=kdu_instance,
839 namespace=instance_info["namespace"],
840 db_dict=db_dict,
841 operation="rollback",
842 run_once=True,
843 check_every=0,
844 )
845
846 if rc != 0:
847 msg = "Error executing command: {}\nOutput: {}".format(command, output)
848 self.log.error(msg)
849 raise K8sException(msg)
850
851 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100852 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000853
854 # return new revision number
855 instance = await self.get_instance_info(
856 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
857 )
858 if instance:
859 revision = int(instance.get("revision"))
860 self.log.debug("New revision: {}".format(revision))
861 return revision
862 else:
863 return 0
864
David Garciaeb8943a2021-04-12 12:07:37 +0200865 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000866 """
867 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
868 (this call should happen after all _terminate-config-primitive_ of the VNF
869 are invoked).
870
871 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
872 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200873 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000874 :return: True if successful
875 """
876
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000877 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100878 "uninstall kdu_instance {} from cluster {}".format(
879 kdu_instance, cluster_uuid
880 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000881 )
882
883 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100884 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885
886 # look for instance to obtain namespace
887 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
888 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200889 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
890 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000891 # init env, paths
892 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100893 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000894 )
895
bravof7bd5c6a2021-11-17 11:14:57 -0300896 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100897 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300898
899 command = self._get_uninstall_command(
900 kdu_instance, instance_info["namespace"], paths["kube_config"]
901 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000902 output, _rc = await self._local_async_exec(
903 command=command, raise_exception_on_error=True, env=env
904 )
905
906 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100907 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000908
909 return self._output_to_table(output)
910
911 async def instances_list(self, cluster_uuid: str) -> list:
912 """
913 returns a list of deployed releases in a cluster
914
915 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
916 :return:
917 """
918
Pedro Escaleirab41de172022-04-02 00:44:08 +0100919 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000920
921 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100922 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000923
924 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100925 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000926
927 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100928 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000929
930 return result
931
932 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
933 instances = await self.instances_list(cluster_uuid=cluster_uuid)
934 for instance in instances:
935 if instance.get("name") == kdu_instance:
936 return instance
937 self.log.debug("Instance {} not found".format(kdu_instance))
938 return None
939
aticig8070c3c2022-04-18 00:31:42 +0300940 async def upgrade_charm(
941 self,
942 ee_id: str = None,
943 path: str = None,
944 charm_id: str = None,
945 charm_type: str = None,
946 timeout: float = None,
947 ) -> str:
948 """This method upgrade charms in VNFs
949
950 Args:
951 ee_id: Execution environment id
952 path: Local path to the charm
953 charm_id: charm-id
954 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
955 timeout: (Float) Timeout for the ns update operation
956
957 Returns:
958 The output of the update operation if status equals to "completed"
959 """
960 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
961
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000962 async def exec_primitive(
963 self,
964 cluster_uuid: str = None,
965 kdu_instance: str = None,
966 primitive_name: str = None,
967 timeout: float = 300,
968 params: dict = None,
969 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200970 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000971 ) -> str:
972 """Exec primitive (Juju action)
973
974 :param cluster_uuid: The UUID of the cluster or namespace:cluster
975 :param kdu_instance: The unique name of the KDU instance
976 :param primitive_name: Name of action that will be executed
977 :param timeout: Timeout for action execution
978 :param params: Dictionary of all the parameters needed for the action
979 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200980 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000981
982 :return: Returns the output of the action
983 """
984 raise K8sException(
985 "KDUs deployed with Helm don't support actions "
986 "different from rollback, upgrade and status"
987 )
988
garciadeblas82b591c2021-03-24 09:22:13 +0100989 async def get_services(
990 self, cluster_uuid: str, kdu_instance: str, namespace: str
991 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000992 """
993 Returns a list of services defined for the specified kdu instance.
994
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param namespace: K8s namespace used by the KDU instance
998 :return: If successful, it will return a list of services, Each service
999 can have the following data:
1000 - `name` of the service
1001 - `type` type of service in the k8 cluster
1002 - `ports` List of ports offered by the service, for each port includes at least
1003 name, port, protocol
1004 - `cluster_ip` Internal ip to be used inside k8s cluster
1005 - `external_ip` List of external ips (in case they are available)
1006 """
1007
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001008 self.log.debug(
1009 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1010 cluster_uuid, kdu_instance
1011 )
1012 )
1013
bravof7bd5c6a2021-11-17 11:14:57 -03001014 # init env, paths
1015 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001016 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001017 )
1018
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001019 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001020 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001021
1022 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001023 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001024 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001025 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001026
1027 service_list = []
1028 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001029 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001030 service_list.append(service)
1031
1032 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001033 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001034
1035 return service_list
1036
garciadeblas82b591c2021-03-24 09:22:13 +01001037 async def get_service(
1038 self, cluster_uuid: str, service_name: str, namespace: str
1039 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001040
1041 self.log.debug(
1042 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001043 service_name, namespace, cluster_uuid
1044 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001045 )
1046
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001047 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001048 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001049
Pedro Escaleirab41de172022-04-02 00:44:08 +01001050 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001051
1052 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001053 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001054
1055 return service
1056
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001057 async def status_kdu(
1058 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1059 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001060 """
1061 This call would retrieve tha current state of a given KDU instance. It would be
1062 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1063 values_ of the configuration parameters applied to a given instance. This call
1064 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001065
David Garciaeb8943a2021-04-12 12:07:37 +02001066 :param cluster_uuid: UUID of a K8s cluster known by OSM
1067 :param kdu_instance: unique name for the KDU instance
1068 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001069 :param yaml_format: if the return shall be returned as an YAML string or as a
1070 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001071 :return: If successful, it will return the following vector of arguments:
1072 - K8s `namespace` in the cluster where the KDU lives
1073 - `state` of the KDU instance. It can be:
1074 - UNKNOWN
1075 - DEPLOYED
1076 - DELETED
1077 - SUPERSEDED
1078 - FAILED or
1079 - DELETING
1080 - List of `resources` (objects) that this release consists of, sorted by kind,
1081 and the status of those resources
1082 - Last `deployment_time`.
1083
1084 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001085 self.log.debug(
1086 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1087 cluster_uuid, kdu_instance
1088 )
1089 )
1090
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001091 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001092 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001093
1094 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001095 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001096 for instance in instances:
1097 if instance.get("name") == kdu_instance:
1098 break
1099 else:
1100 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001101 raise K8sException(
1102 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001103 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001104 )
1105 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001106
1107 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001108 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001109 kdu_instance=kdu_instance,
1110 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001111 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001112 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001113 )
1114
1115 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001116 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001117
1118 return status
1119
aktas867418c2021-10-19 18:26:13 +03001120 async def get_values_kdu(
1121 self, kdu_instance: str, namespace: str, kubeconfig: str
1122 ) -> str:
1123
1124 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1125
1126 return await self._exec_get_command(
1127 get_command="values",
1128 kdu_instance=kdu_instance,
1129 namespace=namespace,
1130 kubeconfig=kubeconfig,
1131 )
1132
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001133 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001134 """Method to obtain the Helm Chart package's values
1135
1136 Args:
1137 kdu_model: The name or path of an Helm Chart
1138 repo_url: Helm Chart repository url
1139
1140 Returns:
1141 str: the values of the Helm Chart package
1142 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001143
1144 self.log.debug(
1145 "inspect kdu_model values {} from (optional) repo: {}".format(
1146 kdu_model, repo_url
1147 )
1148 )
1149
aktas867418c2021-10-19 18:26:13 +03001150 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001151 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1152 )
1153
1154 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1155
1156 self.log.debug(
1157 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1158 )
1159
aktas867418c2021-10-19 18:26:13 +03001160 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001161 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1162 )
1163
1164 async def synchronize_repos(self, cluster_uuid: str):
1165
1166 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1167 try:
1168 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1169 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1170
1171 local_repo_list = await self.repo_list(cluster_uuid)
1172 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1173
1174 deleted_repo_list = []
1175 added_repo_dict = {}
1176
1177 # iterate over the list of repos in the database that should be
1178 # added if not present
1179 for repo_name, db_repo in db_repo_dict.items():
1180 try:
1181 # check if it is already present
1182 curr_repo_url = local_repo_dict.get(db_repo["name"])
1183 repo_id = db_repo.get("_id")
1184 if curr_repo_url != db_repo["url"]:
1185 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001186 self.log.debug(
1187 "repo {} url changed, delete and and again".format(
1188 db_repo["url"]
1189 )
1190 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001191 await self.repo_remove(cluster_uuid, db_repo["name"])
1192 deleted_repo_list.append(repo_id)
1193
1194 # add repo
1195 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001196 if "ca_cert" in db_repo:
1197 await self.repo_add(
1198 cluster_uuid,
1199 db_repo["name"],
1200 db_repo["url"],
1201 cert=db_repo["ca_cert"],
1202 )
1203 else:
1204 await self.repo_add(
1205 cluster_uuid,
1206 db_repo["name"],
1207 db_repo["url"],
1208 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001209 added_repo_dict[repo_id] = db_repo["name"]
1210 except Exception as e:
1211 raise K8sException(
1212 "Error adding repo id: {}, err_msg: {} ".format(
1213 repo_id, repr(e)
1214 )
1215 )
1216
1217 # Delete repos that are present but not in nbi_list
1218 for repo_name in local_repo_dict:
1219 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1220 self.log.debug("delete repo {}".format(repo_name))
1221 try:
1222 await self.repo_remove(cluster_uuid, repo_name)
1223 deleted_repo_list.append(repo_name)
1224 except Exception as e:
1225 self.warning(
1226 "Error deleting repo, name: {}, err_msg: {}".format(
1227 repo_name, str(e)
1228 )
1229 )
1230
1231 return deleted_repo_list, added_repo_dict
1232
1233 except K8sException:
1234 raise
1235 except Exception as e:
1236 # Do not raise errors synchronizing repos
1237 self.log.error("Error synchronizing repos: {}".format(e))
1238 raise Exception("Error synchronizing repos: {}".format(e))
1239
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001240 def _get_db_repos_dict(self, repo_ids: list):
1241 db_repos_dict = {}
1242 for repo_id in repo_ids:
1243 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1244 db_repos_dict[db_repo["name"]] = db_repo
1245 return db_repos_dict
1246
1247 """
1248 ####################################################################################
1249 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1250 ####################################################################################
1251 """
1252
1253 @abc.abstractmethod
1254 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1255 """
1256 Creates and returns base cluster and kube dirs and returns them.
1257 Also created helm3 dirs according to new directory specification, paths are
1258 not returned but assigned to helm environment variables
1259
1260 :param cluster_name: cluster_name
1261 :return: Dictionary with config_paths and dictionary with helm environment variables
1262 """
1263
1264 @abc.abstractmethod
1265 async def _cluster_init(self, cluster_id, namespace, paths, env):
1266 """
1267 Implements the helm version dependent cluster initialization
1268 """
1269
1270 @abc.abstractmethod
1271 async def _instances_list(self, cluster_id):
1272 """
1273 Implements the helm version dependent helm instances list
1274 """
1275
1276 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001277 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001278 """
1279 Implements the helm version dependent method to obtain services from a helm instance
1280 """
1281
1282 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001283 async def _status_kdu(
1284 self,
1285 cluster_id: str,
1286 kdu_instance: str,
1287 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001288 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001289 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001290 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001291 """
1292 Implements the helm version dependent method to obtain status of a helm instance
1293 """
1294
1295 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001296 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001297 self,
1298 kdu_model,
1299 kdu_instance,
1300 namespace,
1301 params_str,
1302 version,
1303 atomic,
1304 timeout,
1305 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001306 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001307 """
1308 Obtain command to be executed to delete the indicated instance
1309 """
1310
1311 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001312 def _get_upgrade_scale_command(
1313 self,
1314 kdu_model,
1315 kdu_instance,
1316 namespace,
1317 count,
1318 version,
1319 atomic,
1320 replicas,
1321 timeout,
1322 resource_name,
1323 kubeconfig,
1324 ) -> str:
1325 """Obtain command to be executed to upgrade the indicated instance."""
1326
1327 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001328 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001329 self,
1330 kdu_model,
1331 kdu_instance,
1332 namespace,
1333 params_str,
1334 version,
1335 atomic,
1336 timeout,
1337 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001338 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001339 """
1340 Obtain command to be executed to upgrade the indicated instance
1341 """
1342
1343 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001344 def _get_rollback_command(
1345 self, kdu_instance, namespace, revision, kubeconfig
1346 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001347 """
1348 Obtain command to be executed to rollback the indicated instance
1349 """
1350
1351 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001352 def _get_uninstall_command(
1353 self, kdu_instance: str, namespace: str, kubeconfig: str
1354 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001355 """
1356 Obtain command to be executed to delete the indicated instance
1357 """
1358
1359 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001360 def _get_inspect_command(
1361 self, show_command: str, kdu_model: str, repo_str: str, version: str
1362 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001363 """Generates the command to obtain the information about an Helm Chart package
1364 (´helm show ...´ command)
1365
1366 Args:
1367 show_command: the second part of the command (`helm show <show_command>`)
1368 kdu_model: The name or path of an Helm Chart
1369 repo_url: Helm Chart repository url
1370 version: constraint with specific version of the Chart to use
1371
1372 Returns:
1373 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001374 """
1375
1376 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001377 def _get_get_command(
1378 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1379 ):
1380 """Obtain command to be executed to get information about the kdu instance."""
1381
1382 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001383 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1384 """
1385 Method call to uninstall cluster software for helm. This method is dependent
1386 of helm version
1387 For Helm v2 it will be called when Tiller must be uninstalled
1388 For Helm v3 it does nothing and does not need to be callled
1389 """
1390
lloretgalleg095392b2020-11-20 11:28:08 +00001391 @abc.abstractmethod
1392 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1393 """
1394 Obtains the cluster repos identifiers
1395 """
1396
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001397 """
1398 ####################################################################################
1399 ################################### P R I V A T E ##################################
1400 ####################################################################################
1401 """
1402
1403 @staticmethod
1404 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1405 if os.path.exists(filename):
1406 return True
1407 else:
1408 msg = "File {} does not exist".format(filename)
1409 if exception_if_not_exists:
1410 raise K8sException(msg)
1411
1412 @staticmethod
1413 def _remove_multiple_spaces(strobj):
1414 strobj = strobj.strip()
1415 while " " in strobj:
1416 strobj = strobj.replace(" ", " ")
1417 return strobj
1418
1419 @staticmethod
1420 def _output_to_lines(output: str) -> list:
1421 output_lines = list()
1422 lines = output.splitlines(keepends=False)
1423 for line in lines:
1424 line = line.strip()
1425 if len(line) > 0:
1426 output_lines.append(line)
1427 return output_lines
1428
1429 @staticmethod
1430 def _output_to_table(output: str) -> list:
1431 output_table = list()
1432 lines = output.splitlines(keepends=False)
1433 for line in lines:
1434 line = line.replace("\t", " ")
1435 line_list = list()
1436 output_table.append(line_list)
1437 cells = line.split(sep=" ")
1438 for cell in cells:
1439 cell = cell.strip()
1440 if len(cell) > 0:
1441 line_list.append(cell)
1442 return output_table
1443
1444 @staticmethod
1445 def _parse_services(output: str) -> list:
1446 lines = output.splitlines(keepends=False)
1447 services = []
1448 for line in lines:
1449 line = line.replace("\t", " ")
1450 cells = line.split(sep=" ")
1451 if len(cells) > 0 and cells[0].startswith("service/"):
1452 elems = cells[0].split(sep="/")
1453 if len(elems) > 1:
1454 services.append(elems[1])
1455 return services
1456
1457 @staticmethod
1458 def _get_deep(dictionary: dict, members: tuple):
1459 target = dictionary
1460 value = None
1461 try:
1462 for m in members:
1463 value = target.get(m)
1464 if not value:
1465 return None
1466 else:
1467 target = value
1468 except Exception:
1469 pass
1470 return value
1471
1472 # find key:value in several lines
1473 @staticmethod
1474 def _find_in_lines(p_lines: list, p_key: str) -> str:
1475 for line in p_lines:
1476 try:
1477 if line.startswith(p_key + ":"):
1478 parts = line.split(":")
1479 the_value = parts[1].strip()
1480 return the_value
1481 except Exception:
1482 # ignore it
1483 pass
1484 return None
1485
1486 @staticmethod
1487 def _lower_keys_list(input_list: list):
1488 """
1489 Transform the keys in a list of dictionaries to lower case and returns a new list
1490 of dictionaries
1491 """
1492 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001493 if input_list:
1494 for dictionary in input_list:
1495 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1496 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001497 return new_list
1498
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001499 async def _local_async_exec(
1500 self,
1501 command: str,
1502 raise_exception_on_error: bool = False,
1503 show_error_log: bool = True,
1504 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001505 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001506 ) -> (str, int):
1507
1508 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001509 self.log.debug(
1510 "Executing async local command: {}, env: {}".format(command, env)
1511 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001512
1513 # split command
1514 command = shlex.split(command)
1515
1516 environ = os.environ.copy()
1517 if env:
1518 environ.update(env)
1519
1520 try:
1521 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001522 *command,
1523 stdout=asyncio.subprocess.PIPE,
1524 stderr=asyncio.subprocess.PIPE,
1525 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001526 )
1527
1528 # wait for command terminate
1529 stdout, stderr = await process.communicate()
1530
1531 return_code = process.returncode
1532
1533 output = ""
1534 if stdout:
1535 output = stdout.decode("utf-8").strip()
1536 # output = stdout.decode()
1537 if stderr:
1538 output = stderr.decode("utf-8").strip()
1539 # output = stderr.decode()
1540
1541 if return_code != 0 and show_error_log:
1542 self.log.debug(
1543 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1544 )
1545 else:
1546 self.log.debug("Return code: {}".format(return_code))
1547
1548 if raise_exception_on_error and return_code != 0:
1549 raise K8sException(output)
1550
1551 if encode_utf8:
1552 output = output.encode("utf-8").strip()
1553 output = str(output).replace("\\n", "\n")
1554
1555 return output, return_code
1556
1557 except asyncio.CancelledError:
1558 raise
1559 except K8sException:
1560 raise
1561 except Exception as e:
1562 msg = "Exception executing command: {} -> {}".format(command, e)
1563 self.log.error(msg)
1564 if raise_exception_on_error:
1565 raise K8sException(e) from e
1566 else:
1567 return "", -1
1568
garciadeblas82b591c2021-03-24 09:22:13 +01001569 async def _local_async_exec_pipe(
1570 self,
1571 command1: str,
1572 command2: str,
1573 raise_exception_on_error: bool = True,
1574 show_error_log: bool = True,
1575 encode_utf8: bool = False,
1576 env: dict = None,
1577 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001578
1579 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1580 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1581 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001582 self.log.debug(
1583 "Executing async local command: {}, env: {}".format(command, env)
1584 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001585
1586 # split command
1587 command1 = shlex.split(command1)
1588 command2 = shlex.split(command2)
1589
1590 environ = os.environ.copy()
1591 if env:
1592 environ.update(env)
1593
1594 try:
1595 read, write = os.pipe()
1596 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1597 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001598 process_2 = await asyncio.create_subprocess_exec(
1599 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1600 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001601 os.close(read)
1602 stdout, stderr = await process_2.communicate()
1603
1604 return_code = process_2.returncode
1605
1606 output = ""
1607 if stdout:
1608 output = stdout.decode("utf-8").strip()
1609 # output = stdout.decode()
1610 if stderr:
1611 output = stderr.decode("utf-8").strip()
1612 # output = stderr.decode()
1613
1614 if return_code != 0 and show_error_log:
1615 self.log.debug(
1616 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1617 )
1618 else:
1619 self.log.debug("Return code: {}".format(return_code))
1620
1621 if raise_exception_on_error and return_code != 0:
1622 raise K8sException(output)
1623
1624 if encode_utf8:
1625 output = output.encode("utf-8").strip()
1626 output = str(output).replace("\\n", "\n")
1627
1628 return output, return_code
1629 except asyncio.CancelledError:
1630 raise
1631 except K8sException:
1632 raise
1633 except Exception as e:
1634 msg = "Exception executing command: {} -> {}".format(command, e)
1635 self.log.error(msg)
1636 if raise_exception_on_error:
1637 raise K8sException(e) from e
1638 else:
1639 return "", -1
1640
1641 async def _get_service(self, cluster_id, service_name, namespace):
1642 """
1643 Obtains the data of the specified service in the k8cluster.
1644
1645 :param cluster_id: id of a K8s cluster known by OSM
1646 :param service_name: name of the K8s service in the specified namespace
1647 :param namespace: K8s namespace used by the KDU instance
1648 :return: If successful, it will return a service with the following data:
1649 - `name` of the service
1650 - `type` type of service in the k8 cluster
1651 - `ports` List of ports offered by the service, for each port includes at least
1652 name, port, protocol
1653 - `cluster_ip` Internal ip to be used inside k8s cluster
1654 - `external_ip` List of external ips (in case they are available)
1655 """
1656
1657 # init config, env
1658 paths, env = self._init_paths_env(
1659 cluster_name=cluster_id, create_if_not_exist=True
1660 )
1661
1662 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1663 self.kubectl_command, paths["kube_config"], namespace, service_name
1664 )
1665
1666 output, _rc = await self._local_async_exec(
1667 command=command, raise_exception_on_error=True, env=env
1668 )
1669
1670 data = yaml.load(output, Loader=yaml.SafeLoader)
1671
1672 service = {
1673 "name": service_name,
1674 "type": self._get_deep(data, ("spec", "type")),
1675 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001676 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001677 }
1678 if service["type"] == "LoadBalancer":
1679 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1680 ip_list = [elem["ip"] for elem in ip_map_list]
1681 service["external_ip"] = ip_list
1682
1683 return service
1684
aktas867418c2021-10-19 18:26:13 +03001685 async def _exec_get_command(
1686 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1687 ):
1688 """Obtains information about the kdu instance."""
1689
1690 full_command = self._get_get_command(
1691 get_command, kdu_instance, namespace, kubeconfig
1692 )
1693
1694 output, _rc = await self._local_async_exec(command=full_command)
1695
1696 return output
1697
1698 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001699 self, inspect_command: str, kdu_model: str, repo_url: str = None
1700 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001701 """Obtains information about an Helm Chart package (´helm show´ command)
1702
1703 Args:
1704 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1705 kdu_model: The name or path of an Helm Chart
1706 repo_url: Helm Chart repository url
1707
1708 Returns:
1709 str: the requested info about the Helm Chart package
1710 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001711
1712 repo_str = ""
1713 if repo_url:
1714 repo_str = " --repo {}".format(repo_url)
1715
1716 idx = kdu_model.find("/")
1717 if idx >= 0:
1718 idx += 1
1719 kdu_model = kdu_model[idx:]
1720
aktas867418c2021-10-19 18:26:13 +03001721 kdu_model, version = self._split_version(kdu_model)
1722 if version:
1723 version_str = "--version {}".format(version)
1724 else:
1725 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001726
garciadeblas82b591c2021-03-24 09:22:13 +01001727 full_command = self._get_inspect_command(
aktas867418c2021-10-19 18:26:13 +03001728 inspect_command, kdu_model, repo_str, version_str
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001729 )
1730
aktas867418c2021-10-19 18:26:13 +03001731 output, _rc = await self._local_async_exec(command=full_command)
1732
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001733 return output
1734
aktas867418c2021-10-19 18:26:13 +03001735 async def _get_replica_count_url(
1736 self,
1737 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001738 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001739 resource_name: str = None,
1740 ):
1741 """Get the replica count value in the Helm Chart Values.
1742
1743 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001744 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001745 repo_url: Helm Chart repository url
1746 resource_name: Resource name
1747
1748 Returns:
1749 True if replicas, False replicaCount
1750 """
1751
1752 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001753 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1754 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001755 )
1756
1757 if not kdu_values:
1758 raise K8sException(
1759 "kdu_values not found for kdu_model {}".format(kdu_model)
1760 )
1761
1762 if resource_name:
1763 kdu_values = kdu_values.get(resource_name, None)
1764
1765 if not kdu_values:
1766 msg = "resource {} not found in the values in model {}".format(
1767 resource_name, kdu_model
1768 )
1769 self.log.error(msg)
1770 raise K8sException(msg)
1771
1772 duplicate_check = False
1773
1774 replica_str = ""
1775 replicas = None
1776
1777 if kdu_values.get("replicaCount", None):
1778 replicas = kdu_values["replicaCount"]
1779 replica_str = "replicaCount"
1780 elif kdu_values.get("replicas", None):
1781 duplicate_check = True
1782 replicas = kdu_values["replicas"]
1783 replica_str = "replicas"
1784 else:
1785 if resource_name:
1786 msg = (
1787 "replicaCount or replicas not found in the resource"
1788 "{} values in model {}. Cannot be scaled".format(
1789 resource_name, kdu_model
1790 )
1791 )
1792 else:
1793 msg = (
1794 "replicaCount or replicas not found in the values"
1795 "in model {}. Cannot be scaled".format(kdu_model)
1796 )
1797 self.log.error(msg)
1798 raise K8sException(msg)
1799
1800 # Control if replicas and replicaCount exists at the same time
1801 msg = "replicaCount and replicas are exists at the same time"
1802 if duplicate_check:
1803 if "replicaCount" in kdu_values:
1804 self.log.error(msg)
1805 raise K8sException(msg)
1806 else:
1807 if "replicas" in kdu_values:
1808 self.log.error(msg)
1809 raise K8sException(msg)
1810
1811 return replicas, replica_str
1812
1813 async def _get_replica_count_instance(
1814 self,
1815 kdu_instance: str,
1816 namespace: str,
1817 kubeconfig: str,
1818 resource_name: str = None,
1819 ):
1820 """Get the replica count value in the instance.
1821
1822 Args:
1823 kdu_instance: The name of the KDU instance
1824 namespace: KDU instance namespace
1825 kubeconfig:
1826 resource_name: Resource name
1827
1828 Returns:
1829 True if replicas, False replicaCount
1830 """
1831
1832 kdu_values = yaml.load(
1833 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1834 Loader=yaml.SafeLoader,
1835 )
1836
1837 replicas = None
1838
1839 if kdu_values:
1840 resource_values = (
1841 kdu_values.get(resource_name, None) if resource_name else None
1842 )
1843 replicas = (
1844 (
1845 resource_values.get("replicaCount", None)
1846 or resource_values.get("replicas", None)
1847 )
1848 if resource_values
1849 else (
1850 kdu_values.get("replicaCount", None)
1851 or kdu_values.get("replicas", None)
1852 )
1853 )
1854
1855 return replicas
1856
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001857 async def _store_status(
1858 self,
1859 cluster_id: str,
1860 operation: str,
1861 kdu_instance: str,
1862 namespace: str = None,
1863 check_every: float = 10,
1864 db_dict: dict = None,
1865 run_once: bool = False,
1866 ):
1867 while True:
1868 try:
1869 await asyncio.sleep(check_every)
1870 detailed_status = await self._status_kdu(
garciadeblas82b591c2021-03-24 09:22:13 +01001871 cluster_id=cluster_id,
1872 kdu_instance=kdu_instance,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001873 yaml_format=False,
garciadeblas82b591c2021-03-24 09:22:13 +01001874 namespace=namespace,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001875 )
1876 status = detailed_status.get("info").get("description")
garciadeblas82b591c2021-03-24 09:22:13 +01001877 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001878 # write status to db
1879 result = await self.write_app_status_to_db(
1880 db_dict=db_dict,
1881 status=str(status),
1882 detailed_status=str(detailed_status),
1883 operation=operation,
1884 )
1885 if not result:
1886 self.log.info("Error writing in database. Task exiting...")
1887 return
1888 except asyncio.CancelledError:
1889 self.log.debug("Task cancelled")
1890 return
1891 except Exception as e:
garciadeblas82b591c2021-03-24 09:22:13 +01001892 self.log.debug(
1893 "_store_status exception: {}".format(str(e)), exc_info=True
1894 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001895 pass
1896 finally:
1897 if run_once:
1898 return
1899
1900 # params for use in -f file
1901 # returns values file option and filename (in order to delete it at the end)
1902 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1903
1904 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001905 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001906
1907 def get_random_number():
1908 r = random.randrange(start=1, stop=99999999)
1909 s = str(r)
1910 while len(s) < 10:
1911 s = "0" + s
1912 return s
1913
1914 params2 = dict()
1915 for key in params:
1916 value = params.get(key)
1917 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001918 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001919 params2[key] = value
1920
1921 values_file = get_random_number() + ".yaml"
1922 with open(values_file, "w") as stream:
1923 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1924
1925 return "-f {}".format(values_file), values_file
1926
1927 return "", None
1928
1929 # params for use in --set option
1930 @staticmethod
1931 def _params_to_set_option(params: dict) -> str:
1932 params_str = ""
1933 if params and len(params) > 0:
1934 start = True
1935 for key in params:
1936 value = params.get(key, None)
1937 if value is not None:
1938 if start:
1939 params_str += "--set "
1940 start = False
1941 else:
1942 params_str += ","
1943 params_str += "{}={}".format(key, value)
1944 return params_str
1945
1946 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001947 def generate_kdu_instance_name(**kwargs):
1948 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001949 # check embeded chart (file or dir)
1950 if chart_name.startswith("/"):
1951 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001952 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001953 # check URL
1954 elif "://" in chart_name:
1955 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001956 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001957
1958 name = ""
1959 for c in chart_name:
1960 if c.isalpha() or c.isnumeric():
1961 name += c
1962 else:
1963 name += "-"
1964 if len(name) > 35:
1965 name = name[0:35]
1966
1967 # if does not start with alpha character, prefix 'a'
1968 if not name[0].isalpha():
1969 name = "a" + name
1970
1971 name += "-"
1972
1973 def get_random_number():
1974 r = random.randrange(start=1, stop=99999999)
1975 s = str(r)
1976 s = s.rjust(10, "0")
1977 return s
1978
1979 name = name + get_random_number()
1980 return name.lower()
aktas867418c2021-10-19 18:26:13 +03001981
1982 def _split_version(self, kdu_model: str) -> (str, str):
1983 version = None
garciadeblas04393192022-06-08 15:39:24 +02001984 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03001985 parts = kdu_model.split(sep=":")
1986 if len(parts) == 2:
1987 version = str(parts[1])
1988 kdu_model = parts[0]
1989 return kdu_model, version
1990
garciadeblas7faf4ec2022-04-08 22:53:25 +02001991 async def _split_repo(self, kdu_model: str) -> str:
1992 repo_name = None
1993 idx = kdu_model.find("/")
1994 if idx >= 0:
1995 repo_name = kdu_model[:idx]
1996 return repo_name
1997
aktas867418c2021-10-19 18:26:13 +03001998 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001999 """Obtain the Helm repository for an Helm Chart
2000
2001 Args:
2002 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2003 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2004
2005 Returns:
2006 str: the repository URL; if Helm Chart is a local one, the function returns None
2007 """
2008
aktas867418c2021-10-19 18:26:13 +03002009 repo_url = None
2010 idx = kdu_model.find("/")
2011 if idx >= 0:
2012 repo_name = kdu_model[:idx]
2013 # Find repository link
2014 local_repo_list = await self.repo_list(cluster_uuid)
2015 for repo in local_repo_list:
2016 repo_url = repo["url"] if repo["name"] == repo_name else None
2017 return repo_url