blob: a4eab4bd45a7b4130748d0711b6f15235ad51c12 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleirab41de172022-04-02 00:44:08 +010093 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000094 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010095 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010099
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000105
106 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200121 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100128 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000129 else:
130 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000131
garciadeblas82b591c2021-03-24 09:22:13 +0100132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
Pedro Escaleirab41de172022-04-02 00:44:08 +0100152 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000153
154 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000163 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100166 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100167 )
168 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000169
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000170 # init_env
171 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100172 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 )
174
bravof7bd5c6a2021-11-17 11:14:57 -0300175 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100176 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300177
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000178 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300180 paths["kube_config"], self._helm_command, name, url
181 )
bravof0ab522f2021-11-23 19:33:18 -0300182
183 if cert:
184 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000198 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000202
garciadeblasd4cee8c2022-05-04 10:57:36 +0200203 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000212 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100213 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000214
garciadeblas7faf4ec2022-04-08 22:53:25 +0200215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
Pedro Escaleirab41de172022-04-02 00:44:08 +0100247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000248
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000249 # config filename
250 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100251 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 )
253
bravof7bd5c6a2021-11-17 11:14:57 -0300254 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100255 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100267 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000283
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000284 # init env, paths
285 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100286 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 )
288
bravof7bd5c6a2021-11-17 11:14:57 -0300289 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100290 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
garciadeblas82b591c2021-03-24 09:22:13 +0100295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000297 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000298
299 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100300 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000308 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200309 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000310
David Garciaeb8943a2021-04-12 12:07:37 +0200311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100323 )
324 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000325
326 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100327 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100356 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000357 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000361
362 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000368 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
garciadeblas04393192022-06-08 15:39:24 +0200374 def _is_helm_chart_a_file(self, chart_name: str):
375 return chart_name.count("/") > 1
376
lloretgalleg095392b2020-11-20 11:28:08 +0000377 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100378 self,
379 cluster_id: str,
380 kdu_model: str,
381 paths: dict,
382 env: dict,
383 kdu_instance: str,
384 atomic: bool = True,
385 timeout: float = 300,
386 params: dict = None,
387 db_dict: dict = None,
388 kdu_name: str = None,
389 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000390 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300391 # init env, paths
392 paths, env = self._init_paths_env(
393 cluster_name=cluster_id, create_if_not_exist=True
394 )
395
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000396 # params to str
397 params_str, file_to_delete = self._params_to_file_option(
398 cluster_id=cluster_id, params=params
399 )
400
401 # version
aktas867418c2021-10-19 18:26:13 +0300402 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000403
garciadeblas7faf4ec2022-04-08 22:53:25 +0200404 repo = self._split_repo(kdu_model)
405 if repo:
406 self.repo_update(cluster_id, repo)
407
garciadeblas82b591c2021-03-24 09:22:13 +0100408 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300409 kdu_model,
410 kdu_instance,
411 namespace,
412 params_str,
413 version,
414 atomic,
415 timeout,
416 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100417 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000418
419 self.log.debug("installing: {}".format(command))
420
421 if atomic:
422 # exec helm in a task
423 exec_task = asyncio.ensure_future(
424 coro_or_future=self._local_async_exec(
425 command=command, raise_exception_on_error=False, env=env
426 )
427 )
428
429 # write status in another task
430 status_task = asyncio.ensure_future(
431 coro_or_future=self._store_status(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 namespace=namespace,
435 db_dict=db_dict,
436 operation="install",
437 run_once=False,
438 )
439 )
440
441 # wait for execution task
442 await asyncio.wait([exec_task])
443
444 # cancel status task
445 status_task.cancel()
446
447 output, rc = exec_task.result()
448
449 else:
450
451 output, rc = await self._local_async_exec(
452 command=command, raise_exception_on_error=False, env=env
453 )
454
455 # remove temporal values yaml file
456 if file_to_delete:
457 os.remove(file_to_delete)
458
459 # write final status
460 await self._store_status(
461 cluster_id=cluster_id,
462 kdu_instance=kdu_instance,
463 namespace=namespace,
464 db_dict=db_dict,
465 operation="install",
466 run_once=True,
467 check_every=0,
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100497 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000498 )
499
bravof7bd5c6a2021-11-17 11:14:57 -0300500 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100501 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300502
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100505 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506 )
507
508 # version
aktas867418c2021-10-19 18:26:13 +0300509 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510
garciadeblas7faf4ec2022-04-08 22:53:25 +0200511 repo = self._split_repo(kdu_model)
512 if repo:
513 self.repo_update(cluster_uuid, repo)
514
garciadeblas82b591c2021-03-24 09:22:13 +0100515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300523 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100524 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100539 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
544 run_once=False,
545 )
546 )
547
548 # wait for execution task
549 await asyncio.wait([exec_task])
550
551 # cancel status task
552 status_task.cancel()
553 output, rc = exec_task.result()
554
555 else:
556
557 output, rc = await self._local_async_exec(
558 command=command, raise_exception_on_error=False, env=env
559 )
560
561 # remove temporal values yaml file
562 if file_to_delete:
563 os.remove(file_to_delete)
564
565 # write final status
566 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100567 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000568 kdu_instance=kdu_instance,
569 namespace=instance_info["namespace"],
570 db_dict=db_dict,
571 operation="upgrade",
572 run_once=True,
573 check_every=0,
574 )
575
576 if rc != 0:
577 msg = "Error executing command: {}\nOutput: {}".format(command, output)
578 self.log.error(msg)
579 raise K8sException(msg)
580
581 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100582 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000583
584 # return new revision number
585 instance = await self.get_instance_info(
586 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
587 )
588 if instance:
589 revision = int(instance.get("revision"))
590 self.log.debug("New revision: {}".format(revision))
591 return revision
592 else:
593 return 0
594
aktas2962f3e2021-03-15 11:05:35 +0300595 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100596 self,
597 kdu_instance: str,
598 scale: int,
599 resource_name: str,
600 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300601 cluster_uuid: str = None,
602 kdu_model: str = None,
603 atomic: bool = True,
604 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100605 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300606 ):
aktas867418c2021-10-19 18:26:13 +0300607 """Scale a resource in a Helm Chart.
608
609 Args:
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
620
621 Returns:
622 True if successful, False otherwise
623 """
624
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300626 if resource_name:
627 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100628 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300629 )
630
631 self.log.debug(debug_mgs)
632
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
636 if not instance_info:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance))
638
639 # init env, paths
640 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100641 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300642 )
643
644 # version
645 kdu_model, version = self._split_version(kdu_model)
646
647 repo_url = await self._find_repo(kdu_model, cluster_uuid)
648 if not repo_url:
649 raise K8sException(
650 "Repository not found for kdu_model {}".format(kdu_model)
651 )
652
653 _, replica_str = await self._get_replica_count_url(
654 kdu_model, repo_url, resource_name
655 )
656
657 command = self._get_upgrade_scale_command(
658 kdu_model,
659 kdu_instance,
660 instance_info["namespace"],
661 scale,
662 version,
663 atomic,
664 replica_str,
665 total_timeout,
666 resource_name,
667 paths["kube_config"],
668 )
669
670 self.log.debug("scaling: {}".format(command))
671
672 if atomic:
673 # exec helm in a task
674 exec_task = asyncio.ensure_future(
675 coro_or_future=self._local_async_exec(
676 command=command, raise_exception_on_error=False, env=env
677 )
678 )
679 # write status in another task
680 status_task = asyncio.ensure_future(
681 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100682 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300683 kdu_instance=kdu_instance,
684 namespace=instance_info["namespace"],
685 db_dict=db_dict,
686 operation="scale",
687 run_once=False,
688 )
689 )
690
691 # wait for execution task
692 await asyncio.wait([exec_task])
693
694 # cancel status task
695 status_task.cancel()
696 output, rc = exec_task.result()
697
698 else:
699 output, rc = await self._local_async_exec(
700 command=command, raise_exception_on_error=False, env=env
701 )
702
703 # write final status
704 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100705 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300706 kdu_instance=kdu_instance,
707 namespace=instance_info["namespace"],
708 db_dict=db_dict,
709 operation="scale",
710 run_once=True,
711 check_every=0,
712 )
713
714 if rc != 0:
715 msg = "Error executing command: {}\nOutput: {}".format(command, output)
716 self.log.error(msg)
717 raise K8sException(msg)
718
719 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100720 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300721
722 return True
aktas2962f3e2021-03-15 11:05:35 +0300723
724 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100725 self,
726 resource_name: str,
727 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300728 cluster_uuid: str,
729 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100730 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300731 ) -> int:
732 """Get a resource scale count.
733
734 Args:
735 cluster_uuid: The UUID of the cluster
736 resource_name: Resource name
737 kdu_instance: KDU instance name
738 kdu_model: The name or path of a bundle
739 kwargs: Additional parameters
740
741 Returns:
742 Resource instance count
743 """
744
aktas867418c2021-10-19 18:26:13 +0300745 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100746 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300747 )
748
749 # look for instance to obtain namespace
750 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
751 if not instance_info:
752 raise K8sException("kdu_instance {} not found".format(kdu_instance))
753
754 # init env, paths
755 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100756 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300757 )
758
759 replicas = await self._get_replica_count_instance(
760 kdu_instance, instance_info["namespace"], paths["kube_config"]
761 )
762
763 # Get default value if scale count is not found from provided values
764 if not replicas:
765 repo_url = await self._find_repo(kdu_model, cluster_uuid)
766 if not repo_url:
767 raise K8sException(
768 "Repository not found for kdu_model {}".format(kdu_model)
769 )
770
771 replicas, _ = await self._get_replica_count_url(
772 kdu_model, repo_url, resource_name
773 )
774
775 if not replicas:
776 msg = "Replica count not found. Cannot be scaled"
777 self.log.error(msg)
778 raise K8sException(msg)
779
780 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300781
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000782 async def rollback(
783 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
784 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000785 self.log.debug(
786 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100787 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000788 )
789 )
790
791 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100792 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000793
794 # look for instance to obtain namespace
795 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
796 if not instance_info:
797 raise K8sException("kdu_instance {} not found".format(kdu_instance))
798
799 # init env, paths
800 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100801 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000802 )
803
bravof7bd5c6a2021-11-17 11:14:57 -0300804 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100805 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300806
garciadeblas82b591c2021-03-24 09:22:13 +0100807 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300808 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100809 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000810
811 self.log.debug("rolling_back: {}".format(command))
812
813 # exec helm in a task
814 exec_task = asyncio.ensure_future(
815 coro_or_future=self._local_async_exec(
816 command=command, raise_exception_on_error=False, env=env
817 )
818 )
819 # write status in another task
820 status_task = asyncio.ensure_future(
821 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100822 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000823 kdu_instance=kdu_instance,
824 namespace=instance_info["namespace"],
825 db_dict=db_dict,
826 operation="rollback",
827 run_once=False,
828 )
829 )
830
831 # wait for execution task
832 await asyncio.wait([exec_task])
833
834 # cancel status task
835 status_task.cancel()
836
837 output, rc = exec_task.result()
838
839 # write final status
840 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100841 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000842 kdu_instance=kdu_instance,
843 namespace=instance_info["namespace"],
844 db_dict=db_dict,
845 operation="rollback",
846 run_once=True,
847 check_every=0,
848 )
849
850 if rc != 0:
851 msg = "Error executing command: {}\nOutput: {}".format(command, output)
852 self.log.error(msg)
853 raise K8sException(msg)
854
855 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100856 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000857
858 # return new revision number
859 instance = await self.get_instance_info(
860 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
861 )
862 if instance:
863 revision = int(instance.get("revision"))
864 self.log.debug("New revision: {}".format(revision))
865 return revision
866 else:
867 return 0
868
David Garciaeb8943a2021-04-12 12:07:37 +0200869 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000870 """
871 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
872 (this call should happen after all _terminate-config-primitive_ of the VNF
873 are invoked).
874
875 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
876 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200877 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000878 :return: True if successful
879 """
880
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000881 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100882 "uninstall kdu_instance {} from cluster {}".format(
883 kdu_instance, cluster_uuid
884 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885 )
886
887 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100888 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000889
890 # look for instance to obtain namespace
891 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
892 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200893 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
894 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000895 # init env, paths
896 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100897 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000898 )
899
bravof7bd5c6a2021-11-17 11:14:57 -0300900 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100901 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300902
903 command = self._get_uninstall_command(
904 kdu_instance, instance_info["namespace"], paths["kube_config"]
905 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000906 output, _rc = await self._local_async_exec(
907 command=command, raise_exception_on_error=True, env=env
908 )
909
910 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100911 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000912
913 return self._output_to_table(output)
914
915 async def instances_list(self, cluster_uuid: str) -> list:
916 """
917 returns a list of deployed releases in a cluster
918
919 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
920 :return:
921 """
922
Pedro Escaleirab41de172022-04-02 00:44:08 +0100923 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000924
925 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100926 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000927
928 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100929 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000930
931 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100932 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000933
934 return result
935
936 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
937 instances = await self.instances_list(cluster_uuid=cluster_uuid)
938 for instance in instances:
939 if instance.get("name") == kdu_instance:
940 return instance
941 self.log.debug("Instance {} not found".format(kdu_instance))
942 return None
943
aticig8070c3c2022-04-18 00:31:42 +0300944 async def upgrade_charm(
945 self,
946 ee_id: str = None,
947 path: str = None,
948 charm_id: str = None,
949 charm_type: str = None,
950 timeout: float = None,
951 ) -> str:
952 """This method upgrade charms in VNFs
953
954 Args:
955 ee_id: Execution environment id
956 path: Local path to the charm
957 charm_id: charm-id
958 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
959 timeout: (Float) Timeout for the ns update operation
960
961 Returns:
962 The output of the update operation if status equals to "completed"
963 """
964 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
965
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000966 async def exec_primitive(
967 self,
968 cluster_uuid: str = None,
969 kdu_instance: str = None,
970 primitive_name: str = None,
971 timeout: float = 300,
972 params: dict = None,
973 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200974 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000975 ) -> str:
976 """Exec primitive (Juju action)
977
978 :param cluster_uuid: The UUID of the cluster or namespace:cluster
979 :param kdu_instance: The unique name of the KDU instance
980 :param primitive_name: Name of action that will be executed
981 :param timeout: Timeout for action execution
982 :param params: Dictionary of all the parameters needed for the action
983 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200984 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000985
986 :return: Returns the output of the action
987 """
988 raise K8sException(
989 "KDUs deployed with Helm don't support actions "
990 "different from rollback, upgrade and status"
991 )
992
garciadeblas82b591c2021-03-24 09:22:13 +0100993 async def get_services(
994 self, cluster_uuid: str, kdu_instance: str, namespace: str
995 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000996 """
997 Returns a list of services defined for the specified kdu instance.
998
999 :param cluster_uuid: UUID of a K8s cluster known by OSM
1000 :param kdu_instance: unique name for the KDU instance
1001 :param namespace: K8s namespace used by the KDU instance
1002 :return: If successful, it will return a list of services, Each service
1003 can have the following data:
1004 - `name` of the service
1005 - `type` type of service in the k8 cluster
1006 - `ports` List of ports offered by the service, for each port includes at least
1007 name, port, protocol
1008 - `cluster_ip` Internal ip to be used inside k8s cluster
1009 - `external_ip` List of external ips (in case they are available)
1010 """
1011
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001012 self.log.debug(
1013 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1014 cluster_uuid, kdu_instance
1015 )
1016 )
1017
bravof7bd5c6a2021-11-17 11:14:57 -03001018 # init env, paths
1019 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001020 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001021 )
1022
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001023 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001024 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001025
1026 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001027 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001028 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001029 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001030
1031 service_list = []
1032 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001033 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001034 service_list.append(service)
1035
1036 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001037 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001038
1039 return service_list
1040
garciadeblas82b591c2021-03-24 09:22:13 +01001041 async def get_service(
1042 self, cluster_uuid: str, service_name: str, namespace: str
1043 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001044
1045 self.log.debug(
1046 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001047 service_name, namespace, cluster_uuid
1048 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001049 )
1050
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001051 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001052 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001053
Pedro Escaleirab41de172022-04-02 00:44:08 +01001054 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001055
1056 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001057 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001058
1059 return service
1060
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001061 async def status_kdu(
1062 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1063 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001064 """
1065 This call would retrieve tha current state of a given KDU instance. It would be
1066 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1067 values_ of the configuration parameters applied to a given instance. This call
1068 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001069
David Garciaeb8943a2021-04-12 12:07:37 +02001070 :param cluster_uuid: UUID of a K8s cluster known by OSM
1071 :param kdu_instance: unique name for the KDU instance
1072 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001073 :param yaml_format: if the return shall be returned as an YAML string or as a
1074 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001075 :return: If successful, it will return the following vector of arguments:
1076 - K8s `namespace` in the cluster where the KDU lives
1077 - `state` of the KDU instance. It can be:
1078 - UNKNOWN
1079 - DEPLOYED
1080 - DELETED
1081 - SUPERSEDED
1082 - FAILED or
1083 - DELETING
1084 - List of `resources` (objects) that this release consists of, sorted by kind,
1085 and the status of those resources
1086 - Last `deployment_time`.
1087
1088 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001089 self.log.debug(
1090 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1091 cluster_uuid, kdu_instance
1092 )
1093 )
1094
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001095 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001096 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001097
1098 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001099 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001100 for instance in instances:
1101 if instance.get("name") == kdu_instance:
1102 break
1103 else:
1104 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001105 raise K8sException(
1106 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001107 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001108 )
1109 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001110
1111 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001112 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001113 kdu_instance=kdu_instance,
1114 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001115 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001116 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001117 )
1118
1119 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001120 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001121
1122 return status
1123
aktas867418c2021-10-19 18:26:13 +03001124 async def get_values_kdu(
1125 self, kdu_instance: str, namespace: str, kubeconfig: str
1126 ) -> str:
1127
1128 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1129
1130 return await self._exec_get_command(
1131 get_command="values",
1132 kdu_instance=kdu_instance,
1133 namespace=namespace,
1134 kubeconfig=kubeconfig,
1135 )
1136
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001137 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1138
1139 self.log.debug(
1140 "inspect kdu_model values {} from (optional) repo: {}".format(
1141 kdu_model, repo_url
1142 )
1143 )
1144
aktas867418c2021-10-19 18:26:13 +03001145 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001146 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1147 )
1148
1149 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1150
1151 self.log.debug(
1152 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1153 )
1154
aktas867418c2021-10-19 18:26:13 +03001155 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001156 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1157 )
1158
1159 async def synchronize_repos(self, cluster_uuid: str):
1160
1161 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1162 try:
1163 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1164 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1165
1166 local_repo_list = await self.repo_list(cluster_uuid)
1167 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1168
1169 deleted_repo_list = []
1170 added_repo_dict = {}
1171
1172 # iterate over the list of repos in the database that should be
1173 # added if not present
1174 for repo_name, db_repo in db_repo_dict.items():
1175 try:
1176 # check if it is already present
1177 curr_repo_url = local_repo_dict.get(db_repo["name"])
1178 repo_id = db_repo.get("_id")
1179 if curr_repo_url != db_repo["url"]:
1180 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001181 self.log.debug(
1182 "repo {} url changed, delete and and again".format(
1183 db_repo["url"]
1184 )
1185 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001186 await self.repo_remove(cluster_uuid, db_repo["name"])
1187 deleted_repo_list.append(repo_id)
1188
1189 # add repo
1190 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001191 if "ca_cert" in db_repo:
1192 await self.repo_add(
1193 cluster_uuid,
1194 db_repo["name"],
1195 db_repo["url"],
1196 cert=db_repo["ca_cert"],
1197 )
1198 else:
1199 await self.repo_add(
1200 cluster_uuid,
1201 db_repo["name"],
1202 db_repo["url"],
1203 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001204 added_repo_dict[repo_id] = db_repo["name"]
1205 except Exception as e:
1206 raise K8sException(
1207 "Error adding repo id: {}, err_msg: {} ".format(
1208 repo_id, repr(e)
1209 )
1210 )
1211
1212 # Delete repos that are present but not in nbi_list
1213 for repo_name in local_repo_dict:
1214 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1215 self.log.debug("delete repo {}".format(repo_name))
1216 try:
1217 await self.repo_remove(cluster_uuid, repo_name)
1218 deleted_repo_list.append(repo_name)
1219 except Exception as e:
1220 self.warning(
1221 "Error deleting repo, name: {}, err_msg: {}".format(
1222 repo_name, str(e)
1223 )
1224 )
1225
1226 return deleted_repo_list, added_repo_dict
1227
1228 except K8sException:
1229 raise
1230 except Exception as e:
1231 # Do not raise errors synchronizing repos
1232 self.log.error("Error synchronizing repos: {}".format(e))
1233 raise Exception("Error synchronizing repos: {}".format(e))
1234
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001235 def _get_db_repos_dict(self, repo_ids: list):
1236 db_repos_dict = {}
1237 for repo_id in repo_ids:
1238 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1239 db_repos_dict[db_repo["name"]] = db_repo
1240 return db_repos_dict
1241
1242 """
1243 ####################################################################################
1244 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1245 ####################################################################################
1246 """
1247
1248 @abc.abstractmethod
1249 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1250 """
1251 Creates and returns base cluster and kube dirs and returns them.
1252 Also created helm3 dirs according to new directory specification, paths are
1253 not returned but assigned to helm environment variables
1254
1255 :param cluster_name: cluster_name
1256 :return: Dictionary with config_paths and dictionary with helm environment variables
1257 """
1258
1259 @abc.abstractmethod
1260 async def _cluster_init(self, cluster_id, namespace, paths, env):
1261 """
1262 Implements the helm version dependent cluster initialization
1263 """
1264
1265 @abc.abstractmethod
1266 async def _instances_list(self, cluster_id):
1267 """
1268 Implements the helm version dependent helm instances list
1269 """
1270
1271 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001272 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001273 """
1274 Implements the helm version dependent method to obtain services from a helm instance
1275 """
1276
1277 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001278 async def _status_kdu(
1279 self,
1280 cluster_id: str,
1281 kdu_instance: str,
1282 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001283 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001284 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001285 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001286 """
1287 Implements the helm version dependent method to obtain status of a helm instance
1288 """
1289
1290 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001291 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001292 self,
1293 kdu_model,
1294 kdu_instance,
1295 namespace,
1296 params_str,
1297 version,
1298 atomic,
1299 timeout,
1300 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001301 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001302 """
1303 Obtain command to be executed to delete the indicated instance
1304 """
1305
1306 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001307 def _get_upgrade_scale_command(
1308 self,
1309 kdu_model,
1310 kdu_instance,
1311 namespace,
1312 count,
1313 version,
1314 atomic,
1315 replicas,
1316 timeout,
1317 resource_name,
1318 kubeconfig,
1319 ) -> str:
1320 """Obtain command to be executed to upgrade the indicated instance."""
1321
1322 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001323 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001324 self,
1325 kdu_model,
1326 kdu_instance,
1327 namespace,
1328 params_str,
1329 version,
1330 atomic,
1331 timeout,
1332 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001333 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001334 """
1335 Obtain command to be executed to upgrade the indicated instance
1336 """
1337
1338 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001339 def _get_rollback_command(
1340 self, kdu_instance, namespace, revision, kubeconfig
1341 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001342 """
1343 Obtain command to be executed to rollback the indicated instance
1344 """
1345
1346 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001347 def _get_uninstall_command(
1348 self, kdu_instance: str, namespace: str, kubeconfig: str
1349 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001350 """
1351 Obtain command to be executed to delete the indicated instance
1352 """
1353
1354 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001355 def _get_inspect_command(
1356 self, show_command: str, kdu_model: str, repo_str: str, version: str
1357 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001358 """
1359 Obtain command to be executed to obtain information about the kdu
1360 """
1361
1362 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001363 def _get_get_command(
1364 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1365 ):
1366 """Obtain command to be executed to get information about the kdu instance."""
1367
1368 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001369 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1370 """
1371 Method call to uninstall cluster software for helm. This method is dependent
1372 of helm version
1373 For Helm v2 it will be called when Tiller must be uninstalled
1374 For Helm v3 it does nothing and does not need to be callled
1375 """
1376
lloretgalleg095392b2020-11-20 11:28:08 +00001377 @abc.abstractmethod
1378 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1379 """
1380 Obtains the cluster repos identifiers
1381 """
1382
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001383 """
1384 ####################################################################################
1385 ################################### P R I V A T E ##################################
1386 ####################################################################################
1387 """
1388
1389 @staticmethod
1390 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1391 if os.path.exists(filename):
1392 return True
1393 else:
1394 msg = "File {} does not exist".format(filename)
1395 if exception_if_not_exists:
1396 raise K8sException(msg)
1397
1398 @staticmethod
1399 def _remove_multiple_spaces(strobj):
1400 strobj = strobj.strip()
1401 while " " in strobj:
1402 strobj = strobj.replace(" ", " ")
1403 return strobj
1404
1405 @staticmethod
1406 def _output_to_lines(output: str) -> list:
1407 output_lines = list()
1408 lines = output.splitlines(keepends=False)
1409 for line in lines:
1410 line = line.strip()
1411 if len(line) > 0:
1412 output_lines.append(line)
1413 return output_lines
1414
1415 @staticmethod
1416 def _output_to_table(output: str) -> list:
1417 output_table = list()
1418 lines = output.splitlines(keepends=False)
1419 for line in lines:
1420 line = line.replace("\t", " ")
1421 line_list = list()
1422 output_table.append(line_list)
1423 cells = line.split(sep=" ")
1424 for cell in cells:
1425 cell = cell.strip()
1426 if len(cell) > 0:
1427 line_list.append(cell)
1428 return output_table
1429
1430 @staticmethod
1431 def _parse_services(output: str) -> list:
1432 lines = output.splitlines(keepends=False)
1433 services = []
1434 for line in lines:
1435 line = line.replace("\t", " ")
1436 cells = line.split(sep=" ")
1437 if len(cells) > 0 and cells[0].startswith("service/"):
1438 elems = cells[0].split(sep="/")
1439 if len(elems) > 1:
1440 services.append(elems[1])
1441 return services
1442
1443 @staticmethod
1444 def _get_deep(dictionary: dict, members: tuple):
1445 target = dictionary
1446 value = None
1447 try:
1448 for m in members:
1449 value = target.get(m)
1450 if not value:
1451 return None
1452 else:
1453 target = value
1454 except Exception:
1455 pass
1456 return value
1457
1458 # find key:value in several lines
1459 @staticmethod
1460 def _find_in_lines(p_lines: list, p_key: str) -> str:
1461 for line in p_lines:
1462 try:
1463 if line.startswith(p_key + ":"):
1464 parts = line.split(":")
1465 the_value = parts[1].strip()
1466 return the_value
1467 except Exception:
1468 # ignore it
1469 pass
1470 return None
1471
1472 @staticmethod
1473 def _lower_keys_list(input_list: list):
1474 """
1475 Transform the keys in a list of dictionaries to lower case and returns a new list
1476 of dictionaries
1477 """
1478 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001479 if input_list:
1480 for dictionary in input_list:
1481 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1482 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001483 return new_list
1484
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001485 async def _local_async_exec(
1486 self,
1487 command: str,
1488 raise_exception_on_error: bool = False,
1489 show_error_log: bool = True,
1490 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001491 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001492 ) -> (str, int):
1493
1494 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001495 self.log.debug(
1496 "Executing async local command: {}, env: {}".format(command, env)
1497 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001498
1499 # split command
1500 command = shlex.split(command)
1501
1502 environ = os.environ.copy()
1503 if env:
1504 environ.update(env)
1505
1506 try:
1507 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001508 *command,
1509 stdout=asyncio.subprocess.PIPE,
1510 stderr=asyncio.subprocess.PIPE,
1511 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001512 )
1513
1514 # wait for command terminate
1515 stdout, stderr = await process.communicate()
1516
1517 return_code = process.returncode
1518
1519 output = ""
1520 if stdout:
1521 output = stdout.decode("utf-8").strip()
1522 # output = stdout.decode()
1523 if stderr:
1524 output = stderr.decode("utf-8").strip()
1525 # output = stderr.decode()
1526
1527 if return_code != 0 and show_error_log:
1528 self.log.debug(
1529 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1530 )
1531 else:
1532 self.log.debug("Return code: {}".format(return_code))
1533
1534 if raise_exception_on_error and return_code != 0:
1535 raise K8sException(output)
1536
1537 if encode_utf8:
1538 output = output.encode("utf-8").strip()
1539 output = str(output).replace("\\n", "\n")
1540
1541 return output, return_code
1542
1543 except asyncio.CancelledError:
1544 raise
1545 except K8sException:
1546 raise
1547 except Exception as e:
1548 msg = "Exception executing command: {} -> {}".format(command, e)
1549 self.log.error(msg)
1550 if raise_exception_on_error:
1551 raise K8sException(e) from e
1552 else:
1553 return "", -1
1554
garciadeblas82b591c2021-03-24 09:22:13 +01001555 async def _local_async_exec_pipe(
1556 self,
1557 command1: str,
1558 command2: str,
1559 raise_exception_on_error: bool = True,
1560 show_error_log: bool = True,
1561 encode_utf8: bool = False,
1562 env: dict = None,
1563 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001564
1565 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1566 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1567 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001568 self.log.debug(
1569 "Executing async local command: {}, env: {}".format(command, env)
1570 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001571
1572 # split command
1573 command1 = shlex.split(command1)
1574 command2 = shlex.split(command2)
1575
1576 environ = os.environ.copy()
1577 if env:
1578 environ.update(env)
1579
1580 try:
1581 read, write = os.pipe()
1582 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1583 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001584 process_2 = await asyncio.create_subprocess_exec(
1585 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1586 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001587 os.close(read)
1588 stdout, stderr = await process_2.communicate()
1589
1590 return_code = process_2.returncode
1591
1592 output = ""
1593 if stdout:
1594 output = stdout.decode("utf-8").strip()
1595 # output = stdout.decode()
1596 if stderr:
1597 output = stderr.decode("utf-8").strip()
1598 # output = stderr.decode()
1599
1600 if return_code != 0 and show_error_log:
1601 self.log.debug(
1602 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1603 )
1604 else:
1605 self.log.debug("Return code: {}".format(return_code))
1606
1607 if raise_exception_on_error and return_code != 0:
1608 raise K8sException(output)
1609
1610 if encode_utf8:
1611 output = output.encode("utf-8").strip()
1612 output = str(output).replace("\\n", "\n")
1613
1614 return output, return_code
1615 except asyncio.CancelledError:
1616 raise
1617 except K8sException:
1618 raise
1619 except Exception as e:
1620 msg = "Exception executing command: {} -> {}".format(command, e)
1621 self.log.error(msg)
1622 if raise_exception_on_error:
1623 raise K8sException(e) from e
1624 else:
1625 return "", -1
1626
1627 async def _get_service(self, cluster_id, service_name, namespace):
1628 """
1629 Obtains the data of the specified service in the k8cluster.
1630
1631 :param cluster_id: id of a K8s cluster known by OSM
1632 :param service_name: name of the K8s service in the specified namespace
1633 :param namespace: K8s namespace used by the KDU instance
1634 :return: If successful, it will return a service with the following data:
1635 - `name` of the service
1636 - `type` type of service in the k8 cluster
1637 - `ports` List of ports offered by the service, for each port includes at least
1638 name, port, protocol
1639 - `cluster_ip` Internal ip to be used inside k8s cluster
1640 - `external_ip` List of external ips (in case they are available)
1641 """
1642
1643 # init config, env
1644 paths, env = self._init_paths_env(
1645 cluster_name=cluster_id, create_if_not_exist=True
1646 )
1647
1648 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1649 self.kubectl_command, paths["kube_config"], namespace, service_name
1650 )
1651
1652 output, _rc = await self._local_async_exec(
1653 command=command, raise_exception_on_error=True, env=env
1654 )
1655
1656 data = yaml.load(output, Loader=yaml.SafeLoader)
1657
1658 service = {
1659 "name": service_name,
1660 "type": self._get_deep(data, ("spec", "type")),
1661 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001662 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001663 }
1664 if service["type"] == "LoadBalancer":
1665 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1666 ip_list = [elem["ip"] for elem in ip_map_list]
1667 service["external_ip"] = ip_list
1668
1669 return service
1670
aktas867418c2021-10-19 18:26:13 +03001671 async def _exec_get_command(
1672 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1673 ):
1674 """Obtains information about the kdu instance."""
1675
1676 full_command = self._get_get_command(
1677 get_command, kdu_instance, namespace, kubeconfig
1678 )
1679
1680 output, _rc = await self._local_async_exec(command=full_command)
1681
1682 return output
1683
1684 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001685 self, inspect_command: str, kdu_model: str, repo_url: str = None
1686 ):
aktas867418c2021-10-19 18:26:13 +03001687 """Obtains information about a kdu, no cluster (no env)."""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001688
1689 repo_str = ""
1690 if repo_url:
1691 repo_str = " --repo {}".format(repo_url)
1692
1693 idx = kdu_model.find("/")
1694 if idx >= 0:
1695 idx += 1
1696 kdu_model = kdu_model[idx:]
1697
aktas867418c2021-10-19 18:26:13 +03001698 kdu_model, version = self._split_version(kdu_model)
1699 if version:
1700 version_str = "--version {}".format(version)
1701 else:
1702 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001703
garciadeblas82b591c2021-03-24 09:22:13 +01001704 full_command = self._get_inspect_command(
aktas867418c2021-10-19 18:26:13 +03001705 inspect_command, kdu_model, repo_str, version_str
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001706 )
1707
aktas867418c2021-10-19 18:26:13 +03001708 output, _rc = await self._local_async_exec(command=full_command)
1709
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001710 return output
1711
aktas867418c2021-10-19 18:26:13 +03001712 async def _get_replica_count_url(
1713 self,
1714 kdu_model: str,
1715 repo_url: str,
1716 resource_name: str = None,
1717 ):
1718 """Get the replica count value in the Helm Chart Values.
1719
1720 Args:
1721 kdu_model: The name or path of a bundle
1722 repo_url: Helm Chart repository url
1723 resource_name: Resource name
1724
1725 Returns:
1726 True if replicas, False replicaCount
1727 """
1728
1729 kdu_values = yaml.load(
1730 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1731 )
1732
1733 if not kdu_values:
1734 raise K8sException(
1735 "kdu_values not found for kdu_model {}".format(kdu_model)
1736 )
1737
1738 if resource_name:
1739 kdu_values = kdu_values.get(resource_name, None)
1740
1741 if not kdu_values:
1742 msg = "resource {} not found in the values in model {}".format(
1743 resource_name, kdu_model
1744 )
1745 self.log.error(msg)
1746 raise K8sException(msg)
1747
1748 duplicate_check = False
1749
1750 replica_str = ""
1751 replicas = None
1752
1753 if kdu_values.get("replicaCount", None):
1754 replicas = kdu_values["replicaCount"]
1755 replica_str = "replicaCount"
1756 elif kdu_values.get("replicas", None):
1757 duplicate_check = True
1758 replicas = kdu_values["replicas"]
1759 replica_str = "replicas"
1760 else:
1761 if resource_name:
1762 msg = (
1763 "replicaCount or replicas not found in the resource"
1764 "{} values in model {}. Cannot be scaled".format(
1765 resource_name, kdu_model
1766 )
1767 )
1768 else:
1769 msg = (
1770 "replicaCount or replicas not found in the values"
1771 "in model {}. Cannot be scaled".format(kdu_model)
1772 )
1773 self.log.error(msg)
1774 raise K8sException(msg)
1775
1776 # Control if replicas and replicaCount exists at the same time
1777 msg = "replicaCount and replicas are exists at the same time"
1778 if duplicate_check:
1779 if "replicaCount" in kdu_values:
1780 self.log.error(msg)
1781 raise K8sException(msg)
1782 else:
1783 if "replicas" in kdu_values:
1784 self.log.error(msg)
1785 raise K8sException(msg)
1786
1787 return replicas, replica_str
1788
1789 async def _get_replica_count_instance(
1790 self,
1791 kdu_instance: str,
1792 namespace: str,
1793 kubeconfig: str,
1794 resource_name: str = None,
1795 ):
1796 """Get the replica count value in the instance.
1797
1798 Args:
1799 kdu_instance: The name of the KDU instance
1800 namespace: KDU instance namespace
1801 kubeconfig:
1802 resource_name: Resource name
1803
1804 Returns:
1805 True if replicas, False replicaCount
1806 """
1807
1808 kdu_values = yaml.load(
1809 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1810 Loader=yaml.SafeLoader,
1811 )
1812
1813 replicas = None
1814
1815 if kdu_values:
1816 resource_values = (
1817 kdu_values.get(resource_name, None) if resource_name else None
1818 )
1819 replicas = (
1820 (
1821 resource_values.get("replicaCount", None)
1822 or resource_values.get("replicas", None)
1823 )
1824 if resource_values
1825 else (
1826 kdu_values.get("replicaCount", None)
1827 or kdu_values.get("replicas", None)
1828 )
1829 )
1830
1831 return replicas
1832
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001833 async def _store_status(
1834 self,
1835 cluster_id: str,
1836 operation: str,
1837 kdu_instance: str,
1838 namespace: str = None,
1839 check_every: float = 10,
1840 db_dict: dict = None,
1841 run_once: bool = False,
1842 ):
1843 while True:
1844 try:
1845 await asyncio.sleep(check_every)
1846 detailed_status = await self._status_kdu(
garciadeblas82b591c2021-03-24 09:22:13 +01001847 cluster_id=cluster_id,
1848 kdu_instance=kdu_instance,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001849 yaml_format=False,
garciadeblas82b591c2021-03-24 09:22:13 +01001850 namespace=namespace,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001851 )
1852 status = detailed_status.get("info").get("description")
garciadeblas82b591c2021-03-24 09:22:13 +01001853 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001854 # write status to db
1855 result = await self.write_app_status_to_db(
1856 db_dict=db_dict,
1857 status=str(status),
1858 detailed_status=str(detailed_status),
1859 operation=operation,
1860 )
1861 if not result:
1862 self.log.info("Error writing in database. Task exiting...")
1863 return
1864 except asyncio.CancelledError:
1865 self.log.debug("Task cancelled")
1866 return
1867 except Exception as e:
garciadeblas82b591c2021-03-24 09:22:13 +01001868 self.log.debug(
1869 "_store_status exception: {}".format(str(e)), exc_info=True
1870 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001871 pass
1872 finally:
1873 if run_once:
1874 return
1875
1876 # params for use in -f file
1877 # returns values file option and filename (in order to delete it at the end)
1878 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1879
1880 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001881 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001882
1883 def get_random_number():
1884 r = random.randrange(start=1, stop=99999999)
1885 s = str(r)
1886 while len(s) < 10:
1887 s = "0" + s
1888 return s
1889
1890 params2 = dict()
1891 for key in params:
1892 value = params.get(key)
1893 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001894 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001895 params2[key] = value
1896
1897 values_file = get_random_number() + ".yaml"
1898 with open(values_file, "w") as stream:
1899 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1900
1901 return "-f {}".format(values_file), values_file
1902
1903 return "", None
1904
1905 # params for use in --set option
1906 @staticmethod
1907 def _params_to_set_option(params: dict) -> str:
1908 params_str = ""
1909 if params and len(params) > 0:
1910 start = True
1911 for key in params:
1912 value = params.get(key, None)
1913 if value is not None:
1914 if start:
1915 params_str += "--set "
1916 start = False
1917 else:
1918 params_str += ","
1919 params_str += "{}={}".format(key, value)
1920 return params_str
1921
1922 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001923 def generate_kdu_instance_name(**kwargs):
1924 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001925 # check embeded chart (file or dir)
1926 if chart_name.startswith("/"):
1927 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001928 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001929 # check URL
1930 elif "://" in chart_name:
1931 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001932 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001933
1934 name = ""
1935 for c in chart_name:
1936 if c.isalpha() or c.isnumeric():
1937 name += c
1938 else:
1939 name += "-"
1940 if len(name) > 35:
1941 name = name[0:35]
1942
1943 # if does not start with alpha character, prefix 'a'
1944 if not name[0].isalpha():
1945 name = "a" + name
1946
1947 name += "-"
1948
1949 def get_random_number():
1950 r = random.randrange(start=1, stop=99999999)
1951 s = str(r)
1952 s = s.rjust(10, "0")
1953 return s
1954
1955 name = name + get_random_number()
1956 return name.lower()
aktas867418c2021-10-19 18:26:13 +03001957
1958 def _split_version(self, kdu_model: str) -> (str, str):
1959 version = None
garciadeblas04393192022-06-08 15:39:24 +02001960 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03001961 parts = kdu_model.split(sep=":")
1962 if len(parts) == 2:
1963 version = str(parts[1])
1964 kdu_model = parts[0]
1965 return kdu_model, version
1966
garciadeblas7faf4ec2022-04-08 22:53:25 +02001967 async def _split_repo(self, kdu_model: str) -> str:
1968 repo_name = None
1969 idx = kdu_model.find("/")
1970 if idx >= 0:
1971 repo_name = kdu_model[:idx]
1972 return repo_name
1973
aktas867418c2021-10-19 18:26:13 +03001974 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1975 repo_url = None
1976 idx = kdu_model.find("/")
1977 if idx >= 0:
1978 repo_name = kdu_model[:idx]
1979 # Find repository link
1980 local_repo_list = await self.repo_list(cluster_uuid)
1981 for repo in local_repo_list:
1982 repo_url = repo["url"] if repo["name"] == repo_name else None
1983 return repo_url