blob: d446b9b4ea280e02b1ac7268bc95707f7ae2d834 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleirab41de172022-04-02 00:44:08 +010093 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000094 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010095 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010099
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000105
106 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200121 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100128 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000129 else:
130 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000131
garciadeblas82b591c2021-03-24 09:22:13 +0100132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
Pedro Escaleirab41de172022-04-02 00:44:08 +0100152 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000153
154 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000163 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100166 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100167 )
168 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000169
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000170 # init_env
171 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100172 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 )
174
bravof7bd5c6a2021-11-17 11:14:57 -0300175 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100176 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300177
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000178 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300180 paths["kube_config"], self._helm_command, name, url
181 )
bravof0ab522f2021-11-23 19:33:18 -0300182
183 if cert:
184 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000198 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000202
garciadeblasd4cee8c2022-05-04 10:57:36 +0200203 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000212 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100213 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000214
garciadeblas7faf4ec2022-04-08 22:53:25 +0200215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
Pedro Escaleirab41de172022-04-02 00:44:08 +0100247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000248
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000249 # config filename
250 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100251 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 )
253
bravof7bd5c6a2021-11-17 11:14:57 -0300254 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100255 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100267 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000283
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000284 # init env, paths
285 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100286 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 )
288
bravof7bd5c6a2021-11-17 11:14:57 -0300289 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100290 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
garciadeblas82b591c2021-03-24 09:22:13 +0100295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000297 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000298
299 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100300 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000308 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200309 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000310
David Garciaeb8943a2021-04-12 12:07:37 +0200311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100323 )
324 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000325
326 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100327 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100356 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000357 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000361
362 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000368 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
lloretgalleg095392b2020-11-20 11:28:08 +0000374 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100375 self,
376 cluster_id: str,
377 kdu_model: str,
378 paths: dict,
379 env: dict,
380 kdu_instance: str,
381 atomic: bool = True,
382 timeout: float = 300,
383 params: dict = None,
384 db_dict: dict = None,
385 kdu_name: str = None,
386 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000387 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300388 # init env, paths
389 paths, env = self._init_paths_env(
390 cluster_name=cluster_id, create_if_not_exist=True
391 )
392
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000393 # params to str
394 params_str, file_to_delete = self._params_to_file_option(
395 cluster_id=cluster_id, params=params
396 )
397
398 # version
aktas867418c2021-10-19 18:26:13 +0300399 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000400
garciadeblas7faf4ec2022-04-08 22:53:25 +0200401 repo = self._split_repo(kdu_model)
402 if repo:
403 self.repo_update(cluster_id, repo)
404
garciadeblas82b591c2021-03-24 09:22:13 +0100405 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300406 kdu_model,
407 kdu_instance,
408 namespace,
409 params_str,
410 version,
411 atomic,
412 timeout,
413 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100414 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000415
416 self.log.debug("installing: {}".format(command))
417
418 if atomic:
419 # exec helm in a task
420 exec_task = asyncio.ensure_future(
421 coro_or_future=self._local_async_exec(
422 command=command, raise_exception_on_error=False, env=env
423 )
424 )
425
426 # write status in another task
427 status_task = asyncio.ensure_future(
428 coro_or_future=self._store_status(
429 cluster_id=cluster_id,
430 kdu_instance=kdu_instance,
431 namespace=namespace,
432 db_dict=db_dict,
433 operation="install",
434 run_once=False,
435 )
436 )
437
438 # wait for execution task
439 await asyncio.wait([exec_task])
440
441 # cancel status task
442 status_task.cancel()
443
444 output, rc = exec_task.result()
445
446 else:
447
448 output, rc = await self._local_async_exec(
449 command=command, raise_exception_on_error=False, env=env
450 )
451
452 # remove temporal values yaml file
453 if file_to_delete:
454 os.remove(file_to_delete)
455
456 # write final status
457 await self._store_status(
458 cluster_id=cluster_id,
459 kdu_instance=kdu_instance,
460 namespace=namespace,
461 db_dict=db_dict,
462 operation="install",
463 run_once=True,
464 check_every=0,
465 )
466
467 if rc != 0:
468 msg = "Error executing command: {}\nOutput: {}".format(command, output)
469 self.log.error(msg)
470 raise K8sException(msg)
471
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000472 async def upgrade(
473 self,
474 cluster_uuid: str,
475 kdu_instance: str,
476 kdu_model: str = None,
477 atomic: bool = True,
478 timeout: float = 300,
479 params: dict = None,
480 db_dict: dict = None,
481 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100482 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000483
484 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # look for instance to obtain namespace
488 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
489 if not instance_info:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance))
491
492 # init env, paths
493 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100494 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000495 )
496
bravof7bd5c6a2021-11-17 11:14:57 -0300497 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100498 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300499
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000500 # params to str
501 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100502 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 )
504
505 # version
aktas867418c2021-10-19 18:26:13 +0300506 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000507
garciadeblas7faf4ec2022-04-08 22:53:25 +0200508 repo = self._split_repo(kdu_model)
509 if repo:
510 self.repo_update(cluster_uuid, repo)
511
garciadeblas82b591c2021-03-24 09:22:13 +0100512 command = self._get_upgrade_command(
513 kdu_model,
514 kdu_instance,
515 instance_info["namespace"],
516 params_str,
517 version,
518 atomic,
519 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300520 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100521 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000522
523 self.log.debug("upgrading: {}".format(command))
524
525 if atomic:
526
527 # exec helm in a task
528 exec_task = asyncio.ensure_future(
529 coro_or_future=self._local_async_exec(
530 command=command, raise_exception_on_error=False, env=env
531 )
532 )
533 # write status in another task
534 status_task = asyncio.ensure_future(
535 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100536 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000537 kdu_instance=kdu_instance,
538 namespace=instance_info["namespace"],
539 db_dict=db_dict,
540 operation="upgrade",
541 run_once=False,
542 )
543 )
544
545 # wait for execution task
546 await asyncio.wait([exec_task])
547
548 # cancel status task
549 status_task.cancel()
550 output, rc = exec_task.result()
551
552 else:
553
554 output, rc = await self._local_async_exec(
555 command=command, raise_exception_on_error=False, env=env
556 )
557
558 # remove temporal values yaml file
559 if file_to_delete:
560 os.remove(file_to_delete)
561
562 # write final status
563 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100564 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000565 kdu_instance=kdu_instance,
566 namespace=instance_info["namespace"],
567 db_dict=db_dict,
568 operation="upgrade",
569 run_once=True,
570 check_every=0,
571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100579 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
aktas2962f3e2021-03-15 11:05:35 +0300592 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100602 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300603 ):
aktas867418c2021-10-19 18:26:13 +0300604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
Pedro Escaleirab41de172022-04-02 00:44:08 +0100622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100638 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
645 if not repo_url:
646 raise K8sException(
647 "Repository not found for kdu_model {}".format(kdu_model)
648 )
649
650 _, replica_str = await self._get_replica_count_url(
651 kdu_model, repo_url, resource_name
652 )
653
654 command = self._get_upgrade_scale_command(
655 kdu_model,
656 kdu_instance,
657 instance_info["namespace"],
658 scale,
659 version,
660 atomic,
661 replica_str,
662 total_timeout,
663 resource_name,
664 paths["kube_config"],
665 )
666
667 self.log.debug("scaling: {}".format(command))
668
669 if atomic:
670 # exec helm in a task
671 exec_task = asyncio.ensure_future(
672 coro_or_future=self._local_async_exec(
673 command=command, raise_exception_on_error=False, env=env
674 )
675 )
676 # write status in another task
677 status_task = asyncio.ensure_future(
678 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100679 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300680 kdu_instance=kdu_instance,
681 namespace=instance_info["namespace"],
682 db_dict=db_dict,
683 operation="scale",
684 run_once=False,
685 )
686 )
687
688 # wait for execution task
689 await asyncio.wait([exec_task])
690
691 # cancel status task
692 status_task.cancel()
693 output, rc = exec_task.result()
694
695 else:
696 output, rc = await self._local_async_exec(
697 command=command, raise_exception_on_error=False, env=env
698 )
699
700 # write final status
701 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100702 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300703 kdu_instance=kdu_instance,
704 namespace=instance_info["namespace"],
705 db_dict=db_dict,
706 operation="scale",
707 run_once=True,
708 check_every=0,
709 )
710
711 if rc != 0:
712 msg = "Error executing command: {}\nOutput: {}".format(command, output)
713 self.log.error(msg)
714 raise K8sException(msg)
715
716 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100717 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300718
719 return True
aktas2962f3e2021-03-15 11:05:35 +0300720
721 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100722 self,
723 resource_name: str,
724 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300725 cluster_uuid: str,
726 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100727 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300728 ) -> int:
729 """Get a resource scale count.
730
731 Args:
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of a bundle
736 kwargs: Additional parameters
737
738 Returns:
739 Resource instance count
740 """
741
aktas867418c2021-10-19 18:26:13 +0300742 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100743 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300744 )
745
746 # look for instance to obtain namespace
747 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
748 if not instance_info:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance))
750
751 # init env, paths
752 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100753 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300754 )
755
756 replicas = await self._get_replica_count_instance(
757 kdu_instance, instance_info["namespace"], paths["kube_config"]
758 )
759
760 # Get default value if scale count is not found from provided values
761 if not replicas:
762 repo_url = await self._find_repo(kdu_model, cluster_uuid)
763 if not repo_url:
764 raise K8sException(
765 "Repository not found for kdu_model {}".format(kdu_model)
766 )
767
768 replicas, _ = await self._get_replica_count_url(
769 kdu_model, repo_url, resource_name
770 )
771
772 if not replicas:
773 msg = "Replica count not found. Cannot be scaled"
774 self.log.error(msg)
775 raise K8sException(msg)
776
777 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300778
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000779 async def rollback(
780 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
781 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000782 self.log.debug(
783 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100784 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000785 )
786 )
787
788 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100789 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000790
791 # look for instance to obtain namespace
792 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
793 if not instance_info:
794 raise K8sException("kdu_instance {} not found".format(kdu_instance))
795
796 # init env, paths
797 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100798 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000799 )
800
bravof7bd5c6a2021-11-17 11:14:57 -0300801 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100802 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300803
garciadeblas82b591c2021-03-24 09:22:13 +0100804 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300805 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100806 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000807
808 self.log.debug("rolling_back: {}".format(command))
809
810 # exec helm in a task
811 exec_task = asyncio.ensure_future(
812 coro_or_future=self._local_async_exec(
813 command=command, raise_exception_on_error=False, env=env
814 )
815 )
816 # write status in another task
817 status_task = asyncio.ensure_future(
818 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100819 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000820 kdu_instance=kdu_instance,
821 namespace=instance_info["namespace"],
822 db_dict=db_dict,
823 operation="rollback",
824 run_once=False,
825 )
826 )
827
828 # wait for execution task
829 await asyncio.wait([exec_task])
830
831 # cancel status task
832 status_task.cancel()
833
834 output, rc = exec_task.result()
835
836 # write final status
837 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100838 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000839 kdu_instance=kdu_instance,
840 namespace=instance_info["namespace"],
841 db_dict=db_dict,
842 operation="rollback",
843 run_once=True,
844 check_every=0,
845 )
846
847 if rc != 0:
848 msg = "Error executing command: {}\nOutput: {}".format(command, output)
849 self.log.error(msg)
850 raise K8sException(msg)
851
852 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100853 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000854
855 # return new revision number
856 instance = await self.get_instance_info(
857 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
858 )
859 if instance:
860 revision = int(instance.get("revision"))
861 self.log.debug("New revision: {}".format(revision))
862 return revision
863 else:
864 return 0
865
David Garciaeb8943a2021-04-12 12:07:37 +0200866 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000867 """
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
870 are invoked).
871
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200874 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000875 :return: True if successful
876 """
877
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000878 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance, cluster_uuid
881 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000882 )
883
884 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100885 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000886
887 # look for instance to obtain namespace
888 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
889 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200890 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
891 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000892 # init env, paths
893 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100894 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000895 )
896
bravof7bd5c6a2021-11-17 11:14:57 -0300897 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100898 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300899
900 command = self._get_uninstall_command(
901 kdu_instance, instance_info["namespace"], paths["kube_config"]
902 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000903 output, _rc = await self._local_async_exec(
904 command=command, raise_exception_on_error=True, env=env
905 )
906
907 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100908 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000909
910 return self._output_to_table(output)
911
912 async def instances_list(self, cluster_uuid: str) -> list:
913 """
914 returns a list of deployed releases in a cluster
915
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
917 :return:
918 """
919
Pedro Escaleirab41de172022-04-02 00:44:08 +0100920 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000921
922 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100923 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000924
925 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100926 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000927
928 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100929 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000930
931 return result
932
933 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
934 instances = await self.instances_list(cluster_uuid=cluster_uuid)
935 for instance in instances:
936 if instance.get("name") == kdu_instance:
937 return instance
938 self.log.debug("Instance {} not found".format(kdu_instance))
939 return None
940
aticig8070c3c2022-04-18 00:31:42 +0300941 async def upgrade_charm(
942 self,
943 ee_id: str = None,
944 path: str = None,
945 charm_id: str = None,
946 charm_type: str = None,
947 timeout: float = None,
948 ) -> str:
949 """This method upgrade charms in VNFs
950
951 Args:
952 ee_id: Execution environment id
953 path: Local path to the charm
954 charm_id: charm-id
955 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
956 timeout: (Float) Timeout for the ns update operation
957
958 Returns:
959 The output of the update operation if status equals to "completed"
960 """
961 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
962
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000963 async def exec_primitive(
964 self,
965 cluster_uuid: str = None,
966 kdu_instance: str = None,
967 primitive_name: str = None,
968 timeout: float = 300,
969 params: dict = None,
970 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200971 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000972 ) -> str:
973 """Exec primitive (Juju action)
974
975 :param cluster_uuid: The UUID of the cluster or namespace:cluster
976 :param kdu_instance: The unique name of the KDU instance
977 :param primitive_name: Name of action that will be executed
978 :param timeout: Timeout for action execution
979 :param params: Dictionary of all the parameters needed for the action
980 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200981 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000982
983 :return: Returns the output of the action
984 """
985 raise K8sException(
986 "KDUs deployed with Helm don't support actions "
987 "different from rollback, upgrade and status"
988 )
989
garciadeblas82b591c2021-03-24 09:22:13 +0100990 async def get_services(
991 self, cluster_uuid: str, kdu_instance: str, namespace: str
992 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000993 """
994 Returns a list of services defined for the specified kdu instance.
995
996 :param cluster_uuid: UUID of a K8s cluster known by OSM
997 :param kdu_instance: unique name for the KDU instance
998 :param namespace: K8s namespace used by the KDU instance
999 :return: If successful, it will return a list of services, Each service
1000 can have the following data:
1001 - `name` of the service
1002 - `type` type of service in the k8 cluster
1003 - `ports` List of ports offered by the service, for each port includes at least
1004 name, port, protocol
1005 - `cluster_ip` Internal ip to be used inside k8s cluster
1006 - `external_ip` List of external ips (in case they are available)
1007 """
1008
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001009 self.log.debug(
1010 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1011 cluster_uuid, kdu_instance
1012 )
1013 )
1014
bravof7bd5c6a2021-11-17 11:14:57 -03001015 # init env, paths
1016 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001017 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001018 )
1019
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001020 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001021 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001022
1023 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001024 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001025 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001026 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001027
1028 service_list = []
1029 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001030 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001031 service_list.append(service)
1032
1033 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001034 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001035
1036 return service_list
1037
garciadeblas82b591c2021-03-24 09:22:13 +01001038 async def get_service(
1039 self, cluster_uuid: str, service_name: str, namespace: str
1040 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001041
1042 self.log.debug(
1043 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001044 service_name, namespace, cluster_uuid
1045 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001046 )
1047
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001048 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001049 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001050
Pedro Escaleirab41de172022-04-02 00:44:08 +01001051 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001052
1053 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001054 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001055
1056 return service
1057
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001058 async def status_kdu(
1059 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1060 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001061 """
1062 This call would retrieve tha current state of a given KDU instance. It would be
1063 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1064 values_ of the configuration parameters applied to a given instance. This call
1065 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001066
David Garciaeb8943a2021-04-12 12:07:37 +02001067 :param cluster_uuid: UUID of a K8s cluster known by OSM
1068 :param kdu_instance: unique name for the KDU instance
1069 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001070 :param yaml_format: if the return shall be returned as an YAML string or as a
1071 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001072 :return: If successful, it will return the following vector of arguments:
1073 - K8s `namespace` in the cluster where the KDU lives
1074 - `state` of the KDU instance. It can be:
1075 - UNKNOWN
1076 - DEPLOYED
1077 - DELETED
1078 - SUPERSEDED
1079 - FAILED or
1080 - DELETING
1081 - List of `resources` (objects) that this release consists of, sorted by kind,
1082 and the status of those resources
1083 - Last `deployment_time`.
1084
1085 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001086 self.log.debug(
1087 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1088 cluster_uuid, kdu_instance
1089 )
1090 )
1091
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001092 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001093 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001094
1095 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001096 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001097 for instance in instances:
1098 if instance.get("name") == kdu_instance:
1099 break
1100 else:
1101 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001102 raise K8sException(
1103 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001104 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001105 )
1106 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001107
1108 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001109 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001110 kdu_instance=kdu_instance,
1111 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001112 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001113 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001114 )
1115
1116 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001117 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001118
1119 return status
1120
aktas867418c2021-10-19 18:26:13 +03001121 async def get_values_kdu(
1122 self, kdu_instance: str, namespace: str, kubeconfig: str
1123 ) -> str:
1124
1125 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1126
1127 return await self._exec_get_command(
1128 get_command="values",
1129 kdu_instance=kdu_instance,
1130 namespace=namespace,
1131 kubeconfig=kubeconfig,
1132 )
1133
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001134 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1135
1136 self.log.debug(
1137 "inspect kdu_model values {} from (optional) repo: {}".format(
1138 kdu_model, repo_url
1139 )
1140 )
1141
aktas867418c2021-10-19 18:26:13 +03001142 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001143 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1144 )
1145
1146 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1147
1148 self.log.debug(
1149 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1150 )
1151
aktas867418c2021-10-19 18:26:13 +03001152 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001153 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1154 )
1155
1156 async def synchronize_repos(self, cluster_uuid: str):
1157
1158 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1159 try:
1160 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1161 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1162
1163 local_repo_list = await self.repo_list(cluster_uuid)
1164 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1165
1166 deleted_repo_list = []
1167 added_repo_dict = {}
1168
1169 # iterate over the list of repos in the database that should be
1170 # added if not present
1171 for repo_name, db_repo in db_repo_dict.items():
1172 try:
1173 # check if it is already present
1174 curr_repo_url = local_repo_dict.get(db_repo["name"])
1175 repo_id = db_repo.get("_id")
1176 if curr_repo_url != db_repo["url"]:
1177 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001178 self.log.debug(
1179 "repo {} url changed, delete and and again".format(
1180 db_repo["url"]
1181 )
1182 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001183 await self.repo_remove(cluster_uuid, db_repo["name"])
1184 deleted_repo_list.append(repo_id)
1185
1186 # add repo
1187 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001188 if "ca_cert" in db_repo:
1189 await self.repo_add(
1190 cluster_uuid,
1191 db_repo["name"],
1192 db_repo["url"],
1193 cert=db_repo["ca_cert"],
1194 )
1195 else:
1196 await self.repo_add(
1197 cluster_uuid,
1198 db_repo["name"],
1199 db_repo["url"],
1200 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001201 added_repo_dict[repo_id] = db_repo["name"]
1202 except Exception as e:
1203 raise K8sException(
1204 "Error adding repo id: {}, err_msg: {} ".format(
1205 repo_id, repr(e)
1206 )
1207 )
1208
1209 # Delete repos that are present but not in nbi_list
1210 for repo_name in local_repo_dict:
1211 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1212 self.log.debug("delete repo {}".format(repo_name))
1213 try:
1214 await self.repo_remove(cluster_uuid, repo_name)
1215 deleted_repo_list.append(repo_name)
1216 except Exception as e:
1217 self.warning(
1218 "Error deleting repo, name: {}, err_msg: {}".format(
1219 repo_name, str(e)
1220 )
1221 )
1222
1223 return deleted_repo_list, added_repo_dict
1224
1225 except K8sException:
1226 raise
1227 except Exception as e:
1228 # Do not raise errors synchronizing repos
1229 self.log.error("Error synchronizing repos: {}".format(e))
1230 raise Exception("Error synchronizing repos: {}".format(e))
1231
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001232 def _get_db_repos_dict(self, repo_ids: list):
1233 db_repos_dict = {}
1234 for repo_id in repo_ids:
1235 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1236 db_repos_dict[db_repo["name"]] = db_repo
1237 return db_repos_dict
1238
1239 """
1240 ####################################################################################
1241 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1242 ####################################################################################
1243 """
1244
1245 @abc.abstractmethod
1246 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1247 """
1248 Creates and returns base cluster and kube dirs and returns them.
1249 Also created helm3 dirs according to new directory specification, paths are
1250 not returned but assigned to helm environment variables
1251
1252 :param cluster_name: cluster_name
1253 :return: Dictionary with config_paths and dictionary with helm environment variables
1254 """
1255
1256 @abc.abstractmethod
1257 async def _cluster_init(self, cluster_id, namespace, paths, env):
1258 """
1259 Implements the helm version dependent cluster initialization
1260 """
1261
1262 @abc.abstractmethod
1263 async def _instances_list(self, cluster_id):
1264 """
1265 Implements the helm version dependent helm instances list
1266 """
1267
1268 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001269 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001270 """
1271 Implements the helm version dependent method to obtain services from a helm instance
1272 """
1273
1274 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001275 async def _status_kdu(
1276 self,
1277 cluster_id: str,
1278 kdu_instance: str,
1279 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001280 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001281 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001282 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001283 """
1284 Implements the helm version dependent method to obtain status of a helm instance
1285 """
1286
1287 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001288 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001289 self,
1290 kdu_model,
1291 kdu_instance,
1292 namespace,
1293 params_str,
1294 version,
1295 atomic,
1296 timeout,
1297 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001298 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001299 """
1300 Obtain command to be executed to delete the indicated instance
1301 """
1302
1303 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001304 def _get_upgrade_scale_command(
1305 self,
1306 kdu_model,
1307 kdu_instance,
1308 namespace,
1309 count,
1310 version,
1311 atomic,
1312 replicas,
1313 timeout,
1314 resource_name,
1315 kubeconfig,
1316 ) -> str:
1317 """Obtain command to be executed to upgrade the indicated instance."""
1318
1319 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001320 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001321 self,
1322 kdu_model,
1323 kdu_instance,
1324 namespace,
1325 params_str,
1326 version,
1327 atomic,
1328 timeout,
1329 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001330 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001331 """
1332 Obtain command to be executed to upgrade the indicated instance
1333 """
1334
1335 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001336 def _get_rollback_command(
1337 self, kdu_instance, namespace, revision, kubeconfig
1338 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001339 """
1340 Obtain command to be executed to rollback the indicated instance
1341 """
1342
1343 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001344 def _get_uninstall_command(
1345 self, kdu_instance: str, namespace: str, kubeconfig: str
1346 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001347 """
1348 Obtain command to be executed to delete the indicated instance
1349 """
1350
1351 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001352 def _get_inspect_command(
1353 self, show_command: str, kdu_model: str, repo_str: str, version: str
1354 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001355 """
1356 Obtain command to be executed to obtain information about the kdu
1357 """
1358
1359 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001360 def _get_get_command(
1361 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1362 ):
1363 """Obtain command to be executed to get information about the kdu instance."""
1364
1365 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001366 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1367 """
1368 Method call to uninstall cluster software for helm. This method is dependent
1369 of helm version
1370 For Helm v2 it will be called when Tiller must be uninstalled
1371 For Helm v3 it does nothing and does not need to be callled
1372 """
1373
lloretgalleg095392b2020-11-20 11:28:08 +00001374 @abc.abstractmethod
1375 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1376 """
1377 Obtains the cluster repos identifiers
1378 """
1379
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001380 """
1381 ####################################################################################
1382 ################################### P R I V A T E ##################################
1383 ####################################################################################
1384 """
1385
1386 @staticmethod
1387 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1388 if os.path.exists(filename):
1389 return True
1390 else:
1391 msg = "File {} does not exist".format(filename)
1392 if exception_if_not_exists:
1393 raise K8sException(msg)
1394
1395 @staticmethod
1396 def _remove_multiple_spaces(strobj):
1397 strobj = strobj.strip()
1398 while " " in strobj:
1399 strobj = strobj.replace(" ", " ")
1400 return strobj
1401
1402 @staticmethod
1403 def _output_to_lines(output: str) -> list:
1404 output_lines = list()
1405 lines = output.splitlines(keepends=False)
1406 for line in lines:
1407 line = line.strip()
1408 if len(line) > 0:
1409 output_lines.append(line)
1410 return output_lines
1411
1412 @staticmethod
1413 def _output_to_table(output: str) -> list:
1414 output_table = list()
1415 lines = output.splitlines(keepends=False)
1416 for line in lines:
1417 line = line.replace("\t", " ")
1418 line_list = list()
1419 output_table.append(line_list)
1420 cells = line.split(sep=" ")
1421 for cell in cells:
1422 cell = cell.strip()
1423 if len(cell) > 0:
1424 line_list.append(cell)
1425 return output_table
1426
1427 @staticmethod
1428 def _parse_services(output: str) -> list:
1429 lines = output.splitlines(keepends=False)
1430 services = []
1431 for line in lines:
1432 line = line.replace("\t", " ")
1433 cells = line.split(sep=" ")
1434 if len(cells) > 0 and cells[0].startswith("service/"):
1435 elems = cells[0].split(sep="/")
1436 if len(elems) > 1:
1437 services.append(elems[1])
1438 return services
1439
1440 @staticmethod
1441 def _get_deep(dictionary: dict, members: tuple):
1442 target = dictionary
1443 value = None
1444 try:
1445 for m in members:
1446 value = target.get(m)
1447 if not value:
1448 return None
1449 else:
1450 target = value
1451 except Exception:
1452 pass
1453 return value
1454
1455 # find key:value in several lines
1456 @staticmethod
1457 def _find_in_lines(p_lines: list, p_key: str) -> str:
1458 for line in p_lines:
1459 try:
1460 if line.startswith(p_key + ":"):
1461 parts = line.split(":")
1462 the_value = parts[1].strip()
1463 return the_value
1464 except Exception:
1465 # ignore it
1466 pass
1467 return None
1468
1469 @staticmethod
1470 def _lower_keys_list(input_list: list):
1471 """
1472 Transform the keys in a list of dictionaries to lower case and returns a new list
1473 of dictionaries
1474 """
1475 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001476 if input_list:
1477 for dictionary in input_list:
1478 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1479 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001480 return new_list
1481
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001482 async def _local_async_exec(
1483 self,
1484 command: str,
1485 raise_exception_on_error: bool = False,
1486 show_error_log: bool = True,
1487 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001488 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001489 ) -> (str, int):
1490
1491 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001492 self.log.debug(
1493 "Executing async local command: {}, env: {}".format(command, env)
1494 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001495
1496 # split command
1497 command = shlex.split(command)
1498
1499 environ = os.environ.copy()
1500 if env:
1501 environ.update(env)
1502
1503 try:
1504 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001505 *command,
1506 stdout=asyncio.subprocess.PIPE,
1507 stderr=asyncio.subprocess.PIPE,
1508 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001509 )
1510
1511 # wait for command terminate
1512 stdout, stderr = await process.communicate()
1513
1514 return_code = process.returncode
1515
1516 output = ""
1517 if stdout:
1518 output = stdout.decode("utf-8").strip()
1519 # output = stdout.decode()
1520 if stderr:
1521 output = stderr.decode("utf-8").strip()
1522 # output = stderr.decode()
1523
1524 if return_code != 0 and show_error_log:
1525 self.log.debug(
1526 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1527 )
1528 else:
1529 self.log.debug("Return code: {}".format(return_code))
1530
1531 if raise_exception_on_error and return_code != 0:
1532 raise K8sException(output)
1533
1534 if encode_utf8:
1535 output = output.encode("utf-8").strip()
1536 output = str(output).replace("\\n", "\n")
1537
1538 return output, return_code
1539
1540 except asyncio.CancelledError:
1541 raise
1542 except K8sException:
1543 raise
1544 except Exception as e:
1545 msg = "Exception executing command: {} -> {}".format(command, e)
1546 self.log.error(msg)
1547 if raise_exception_on_error:
1548 raise K8sException(e) from e
1549 else:
1550 return "", -1
1551
garciadeblas82b591c2021-03-24 09:22:13 +01001552 async def _local_async_exec_pipe(
1553 self,
1554 command1: str,
1555 command2: str,
1556 raise_exception_on_error: bool = True,
1557 show_error_log: bool = True,
1558 encode_utf8: bool = False,
1559 env: dict = None,
1560 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001561
1562 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1563 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1564 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001565 self.log.debug(
1566 "Executing async local command: {}, env: {}".format(command, env)
1567 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001568
1569 # split command
1570 command1 = shlex.split(command1)
1571 command2 = shlex.split(command2)
1572
1573 environ = os.environ.copy()
1574 if env:
1575 environ.update(env)
1576
1577 try:
1578 read, write = os.pipe()
1579 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1580 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001581 process_2 = await asyncio.create_subprocess_exec(
1582 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1583 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001584 os.close(read)
1585 stdout, stderr = await process_2.communicate()
1586
1587 return_code = process_2.returncode
1588
1589 output = ""
1590 if stdout:
1591 output = stdout.decode("utf-8").strip()
1592 # output = stdout.decode()
1593 if stderr:
1594 output = stderr.decode("utf-8").strip()
1595 # output = stderr.decode()
1596
1597 if return_code != 0 and show_error_log:
1598 self.log.debug(
1599 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1600 )
1601 else:
1602 self.log.debug("Return code: {}".format(return_code))
1603
1604 if raise_exception_on_error and return_code != 0:
1605 raise K8sException(output)
1606
1607 if encode_utf8:
1608 output = output.encode("utf-8").strip()
1609 output = str(output).replace("\\n", "\n")
1610
1611 return output, return_code
1612 except asyncio.CancelledError:
1613 raise
1614 except K8sException:
1615 raise
1616 except Exception as e:
1617 msg = "Exception executing command: {} -> {}".format(command, e)
1618 self.log.error(msg)
1619 if raise_exception_on_error:
1620 raise K8sException(e) from e
1621 else:
1622 return "", -1
1623
1624 async def _get_service(self, cluster_id, service_name, namespace):
1625 """
1626 Obtains the data of the specified service in the k8cluster.
1627
1628 :param cluster_id: id of a K8s cluster known by OSM
1629 :param service_name: name of the K8s service in the specified namespace
1630 :param namespace: K8s namespace used by the KDU instance
1631 :return: If successful, it will return a service with the following data:
1632 - `name` of the service
1633 - `type` type of service in the k8 cluster
1634 - `ports` List of ports offered by the service, for each port includes at least
1635 name, port, protocol
1636 - `cluster_ip` Internal ip to be used inside k8s cluster
1637 - `external_ip` List of external ips (in case they are available)
1638 """
1639
1640 # init config, env
1641 paths, env = self._init_paths_env(
1642 cluster_name=cluster_id, create_if_not_exist=True
1643 )
1644
1645 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1646 self.kubectl_command, paths["kube_config"], namespace, service_name
1647 )
1648
1649 output, _rc = await self._local_async_exec(
1650 command=command, raise_exception_on_error=True, env=env
1651 )
1652
1653 data = yaml.load(output, Loader=yaml.SafeLoader)
1654
1655 service = {
1656 "name": service_name,
1657 "type": self._get_deep(data, ("spec", "type")),
1658 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001659 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001660 }
1661 if service["type"] == "LoadBalancer":
1662 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1663 ip_list = [elem["ip"] for elem in ip_map_list]
1664 service["external_ip"] = ip_list
1665
1666 return service
1667
aktas867418c2021-10-19 18:26:13 +03001668 async def _exec_get_command(
1669 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1670 ):
1671 """Obtains information about the kdu instance."""
1672
1673 full_command = self._get_get_command(
1674 get_command, kdu_instance, namespace, kubeconfig
1675 )
1676
1677 output, _rc = await self._local_async_exec(command=full_command)
1678
1679 return output
1680
1681 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001682 self, inspect_command: str, kdu_model: str, repo_url: str = None
1683 ):
aktas867418c2021-10-19 18:26:13 +03001684 """Obtains information about a kdu, no cluster (no env)."""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001685
1686 repo_str = ""
1687 if repo_url:
1688 repo_str = " --repo {}".format(repo_url)
1689
1690 idx = kdu_model.find("/")
1691 if idx >= 0:
1692 idx += 1
1693 kdu_model = kdu_model[idx:]
1694
aktas867418c2021-10-19 18:26:13 +03001695 kdu_model, version = self._split_version(kdu_model)
1696 if version:
1697 version_str = "--version {}".format(version)
1698 else:
1699 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001700
garciadeblas82b591c2021-03-24 09:22:13 +01001701 full_command = self._get_inspect_command(
aktas867418c2021-10-19 18:26:13 +03001702 inspect_command, kdu_model, repo_str, version_str
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001703 )
1704
aktas867418c2021-10-19 18:26:13 +03001705 output, _rc = await self._local_async_exec(command=full_command)
1706
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001707 return output
1708
aktas867418c2021-10-19 18:26:13 +03001709 async def _get_replica_count_url(
1710 self,
1711 kdu_model: str,
1712 repo_url: str,
1713 resource_name: str = None,
1714 ):
1715 """Get the replica count value in the Helm Chart Values.
1716
1717 Args:
1718 kdu_model: The name or path of a bundle
1719 repo_url: Helm Chart repository url
1720 resource_name: Resource name
1721
1722 Returns:
1723 True if replicas, False replicaCount
1724 """
1725
1726 kdu_values = yaml.load(
1727 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1728 )
1729
1730 if not kdu_values:
1731 raise K8sException(
1732 "kdu_values not found for kdu_model {}".format(kdu_model)
1733 )
1734
1735 if resource_name:
1736 kdu_values = kdu_values.get(resource_name, None)
1737
1738 if not kdu_values:
1739 msg = "resource {} not found in the values in model {}".format(
1740 resource_name, kdu_model
1741 )
1742 self.log.error(msg)
1743 raise K8sException(msg)
1744
1745 duplicate_check = False
1746
1747 replica_str = ""
1748 replicas = None
1749
1750 if kdu_values.get("replicaCount", None):
1751 replicas = kdu_values["replicaCount"]
1752 replica_str = "replicaCount"
1753 elif kdu_values.get("replicas", None):
1754 duplicate_check = True
1755 replicas = kdu_values["replicas"]
1756 replica_str = "replicas"
1757 else:
1758 if resource_name:
1759 msg = (
1760 "replicaCount or replicas not found in the resource"
1761 "{} values in model {}. Cannot be scaled".format(
1762 resource_name, kdu_model
1763 )
1764 )
1765 else:
1766 msg = (
1767 "replicaCount or replicas not found in the values"
1768 "in model {}. Cannot be scaled".format(kdu_model)
1769 )
1770 self.log.error(msg)
1771 raise K8sException(msg)
1772
1773 # Control if replicas and replicaCount exists at the same time
1774 msg = "replicaCount and replicas are exists at the same time"
1775 if duplicate_check:
1776 if "replicaCount" in kdu_values:
1777 self.log.error(msg)
1778 raise K8sException(msg)
1779 else:
1780 if "replicas" in kdu_values:
1781 self.log.error(msg)
1782 raise K8sException(msg)
1783
1784 return replicas, replica_str
1785
1786 async def _get_replica_count_instance(
1787 self,
1788 kdu_instance: str,
1789 namespace: str,
1790 kubeconfig: str,
1791 resource_name: str = None,
1792 ):
1793 """Get the replica count value in the instance.
1794
1795 Args:
1796 kdu_instance: The name of the KDU instance
1797 namespace: KDU instance namespace
1798 kubeconfig:
1799 resource_name: Resource name
1800
1801 Returns:
1802 True if replicas, False replicaCount
1803 """
1804
1805 kdu_values = yaml.load(
1806 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1807 Loader=yaml.SafeLoader,
1808 )
1809
1810 replicas = None
1811
1812 if kdu_values:
1813 resource_values = (
1814 kdu_values.get(resource_name, None) if resource_name else None
1815 )
1816 replicas = (
1817 (
1818 resource_values.get("replicaCount", None)
1819 or resource_values.get("replicas", None)
1820 )
1821 if resource_values
1822 else (
1823 kdu_values.get("replicaCount", None)
1824 or kdu_values.get("replicas", None)
1825 )
1826 )
1827
1828 return replicas
1829
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001830 async def _store_status(
1831 self,
1832 cluster_id: str,
1833 operation: str,
1834 kdu_instance: str,
1835 namespace: str = None,
1836 check_every: float = 10,
1837 db_dict: dict = None,
1838 run_once: bool = False,
1839 ):
1840 while True:
1841 try:
1842 await asyncio.sleep(check_every)
1843 detailed_status = await self._status_kdu(
garciadeblas82b591c2021-03-24 09:22:13 +01001844 cluster_id=cluster_id,
1845 kdu_instance=kdu_instance,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001846 yaml_format=False,
garciadeblas82b591c2021-03-24 09:22:13 +01001847 namespace=namespace,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001848 )
1849 status = detailed_status.get("info").get("description")
garciadeblas82b591c2021-03-24 09:22:13 +01001850 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001851 # write status to db
1852 result = await self.write_app_status_to_db(
1853 db_dict=db_dict,
1854 status=str(status),
1855 detailed_status=str(detailed_status),
1856 operation=operation,
1857 )
1858 if not result:
1859 self.log.info("Error writing in database. Task exiting...")
1860 return
1861 except asyncio.CancelledError:
1862 self.log.debug("Task cancelled")
1863 return
1864 except Exception as e:
garciadeblas82b591c2021-03-24 09:22:13 +01001865 self.log.debug(
1866 "_store_status exception: {}".format(str(e)), exc_info=True
1867 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001868 pass
1869 finally:
1870 if run_once:
1871 return
1872
1873 # params for use in -f file
1874 # returns values file option and filename (in order to delete it at the end)
1875 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1876
1877 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001878 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001879
1880 def get_random_number():
1881 r = random.randrange(start=1, stop=99999999)
1882 s = str(r)
1883 while len(s) < 10:
1884 s = "0" + s
1885 return s
1886
1887 params2 = dict()
1888 for key in params:
1889 value = params.get(key)
1890 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001891 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001892 params2[key] = value
1893
1894 values_file = get_random_number() + ".yaml"
1895 with open(values_file, "w") as stream:
1896 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1897
1898 return "-f {}".format(values_file), values_file
1899
1900 return "", None
1901
1902 # params for use in --set option
1903 @staticmethod
1904 def _params_to_set_option(params: dict) -> str:
1905 params_str = ""
1906 if params and len(params) > 0:
1907 start = True
1908 for key in params:
1909 value = params.get(key, None)
1910 if value is not None:
1911 if start:
1912 params_str += "--set "
1913 start = False
1914 else:
1915 params_str += ","
1916 params_str += "{}={}".format(key, value)
1917 return params_str
1918
1919 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001920 def generate_kdu_instance_name(**kwargs):
1921 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001922 # check embeded chart (file or dir)
1923 if chart_name.startswith("/"):
1924 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001925 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001926 # check URL
1927 elif "://" in chart_name:
1928 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001929 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001930
1931 name = ""
1932 for c in chart_name:
1933 if c.isalpha() or c.isnumeric():
1934 name += c
1935 else:
1936 name += "-"
1937 if len(name) > 35:
1938 name = name[0:35]
1939
1940 # if does not start with alpha character, prefix 'a'
1941 if not name[0].isalpha():
1942 name = "a" + name
1943
1944 name += "-"
1945
1946 def get_random_number():
1947 r = random.randrange(start=1, stop=99999999)
1948 s = str(r)
1949 s = s.rjust(10, "0")
1950 return s
1951
1952 name = name + get_random_number()
1953 return name.lower()
aktas867418c2021-10-19 18:26:13 +03001954
1955 def _split_version(self, kdu_model: str) -> (str, str):
1956 version = None
1957 if ":" in kdu_model:
1958 parts = kdu_model.split(sep=":")
1959 if len(parts) == 2:
1960 version = str(parts[1])
1961 kdu_model = parts[0]
1962 return kdu_model, version
1963
garciadeblas7faf4ec2022-04-08 22:53:25 +02001964 async def _split_repo(self, kdu_model: str) -> str:
1965 repo_name = None
1966 idx = kdu_model.find("/")
1967 if idx >= 0:
1968 repo_name = kdu_model[:idx]
1969 return repo_name
1970
aktas867418c2021-10-19 18:26:13 +03001971 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1972 repo_url = None
1973 idx = kdu_model.find("/")
1974 if idx >= 0:
1975 repo_name = kdu_model[:idx]
1976 # Find repository link
1977 local_repo_list = await self.repo_list(cluster_uuid)
1978 for repo in local_repo_list:
1979 repo_url = repo["url"] if repo["name"] == repo_name else None
1980 return repo_url