blob: 51eb4758a3496a3c6ae5e0897a1fdb6568af2528 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleira1f222a92022-06-20 15:40:43 +010093 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
Pedro Escaleirab41de172022-04-02 00:44:08 +010096 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000097 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010098 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000108
109 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200124 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100131 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000132 else:
133 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
garciadeblas82b591c2021-03-24 09:22:13 +0100135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
Pedro Escaleirab41de172022-04-02 00:44:08 +0100155 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000156
157 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000166 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100169 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100170 )
171 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000172
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 # init_env
174 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100175 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000176 )
177
bravof7bd5c6a2021-11-17 11:14:57 -0300178 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100179 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300180
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000181 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300183 paths["kube_config"], self._helm_command, name, url
184 )
bravof0ab522f2021-11-23 19:33:18 -0300185
186 if cert:
187 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000201 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000205
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000215 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100216 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000217
garciadeblas7faf4ec2022-04-08 22:53:25 +0200218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
Pedro Escaleirab41de172022-04-02 00:44:08 +0100250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000251
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 # config filename
253 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100254 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000255 )
256
bravof7bd5c6a2021-11-17 11:14:57 -0300257 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100258 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100270 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000286
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 # init env, paths
288 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100289 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000290 )
291
bravof7bd5c6a2021-11-17 11:14:57 -0300292 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100293 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
garciadeblas82b591c2021-03-24 09:22:13 +0100298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000300 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100303 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000304
305 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000311 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200312 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000313
David Garciaeb8943a2021-04-12 12:07:37 +0200314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100325 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100326 )
327 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100330 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100359 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000360 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367
368 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000371 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100372 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
garciadeblas04393192022-06-08 15:39:24 +0200377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
lloretgalleg095392b2020-11-20 11:28:08 +0000380 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000393 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
aktas867418c2021-10-19 18:26:13 +0300405 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000406
garciadeblas7faf4ec2022-04-08 22:53:25 +0200407 repo = self._split_repo(kdu_model)
408 if repo:
limon537d2932022-07-21 13:55:55 +0200409 await self.repo_update(cluster_id, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200410
garciadeblas82b591c2021-03-24 09:22:13 +0100411 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100420 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100497 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000498 )
499
bravof7bd5c6a2021-11-17 11:14:57 -0300500 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100501 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300502
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100505 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506 )
507
508 # version
aktas867418c2021-10-19 18:26:13 +0300509 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510
garciadeblas7faf4ec2022-04-08 22:53:25 +0200511 repo = self._split_repo(kdu_model)
512 if repo:
limon537d2932022-07-21 13:55:55 +0200513 await self.repo_update(cluster_uuid, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200514
garciadeblas82b591c2021-03-24 09:22:13 +0100515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300523 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100524 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100539 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000544 )
545 )
546
547 # wait for execution task
548 await asyncio.wait([exec_task])
549
550 # cancel status task
551 status_task.cancel()
552 output, rc = exec_task.result()
553
554 else:
555
556 output, rc = await self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559
560 # remove temporal values yaml file
561 if file_to_delete:
562 os.remove(file_to_delete)
563
564 # write final status
565 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100566 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000567 kdu_instance=kdu_instance,
568 namespace=instance_info["namespace"],
569 db_dict=db_dict,
570 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100579 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
aktas2962f3e2021-03-15 11:05:35 +0300592 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100602 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300603 ):
aktas867418c2021-10-19 18:26:13 +0300604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
Pedro Escaleirab41de172022-04-02 00:44:08 +0100622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100638 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300645
646 _, replica_str = await self._get_replica_count_url(
647 kdu_model, repo_url, resource_name
648 )
649
650 command = self._get_upgrade_scale_command(
651 kdu_model,
652 kdu_instance,
653 instance_info["namespace"],
654 scale,
655 version,
656 atomic,
657 replica_str,
658 total_timeout,
659 resource_name,
660 paths["kube_config"],
661 )
662
663 self.log.debug("scaling: {}".format(command))
664
665 if atomic:
666 # exec helm in a task
667 exec_task = asyncio.ensure_future(
668 coro_or_future=self._local_async_exec(
669 command=command, raise_exception_on_error=False, env=env
670 )
671 )
672 # write status in another task
673 status_task = asyncio.ensure_future(
674 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100675 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300676 kdu_instance=kdu_instance,
677 namespace=instance_info["namespace"],
678 db_dict=db_dict,
679 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300680 )
681 )
682
683 # wait for execution task
684 await asyncio.wait([exec_task])
685
686 # cancel status task
687 status_task.cancel()
688 output, rc = exec_task.result()
689
690 else:
691 output, rc = await self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694
695 # write final status
696 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100697 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300698 kdu_instance=kdu_instance,
699 namespace=instance_info["namespace"],
700 db_dict=db_dict,
701 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300702 )
703
704 if rc != 0:
705 msg = "Error executing command: {}\nOutput: {}".format(command, output)
706 self.log.error(msg)
707 raise K8sException(msg)
708
709 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100710 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300711
712 return True
aktas2962f3e2021-03-15 11:05:35 +0300713
714 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100715 self,
716 resource_name: str,
717 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300718 cluster_uuid: str,
719 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100720 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300721 ) -> int:
722 """Get a resource scale count.
723
724 Args:
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100728 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300729 kwargs: Additional parameters
730
731 Returns:
732 Resource instance count
733 """
734
aktas867418c2021-10-19 18:26:13 +0300735 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100736 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300737 )
738
739 # look for instance to obtain namespace
740 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
741 if not instance_info:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance))
743
744 # init env, paths
745 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100746 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300747 )
748
749 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100750 kdu_instance=kdu_instance,
751 namespace=instance_info["namespace"],
752 kubeconfig=paths["kube_config"],
753 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300754 )
755
756 # Get default value if scale count is not found from provided values
757 if not replicas:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100758 repo_url = await self._find_repo(
759 kdu_model=kdu_model, cluster_uuid=cluster_uuid
760 )
aktas867418c2021-10-19 18:26:13 +0300761 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100762 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300763 )
764
765 if not replicas:
766 msg = "Replica count not found. Cannot be scaled"
767 self.log.error(msg)
768 raise K8sException(msg)
769
770 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300771
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000772 async def rollback(
773 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
774 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000775 self.log.debug(
776 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100777 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000778 )
779 )
780
781 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100782 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000783
784 # look for instance to obtain namespace
785 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
786 if not instance_info:
787 raise K8sException("kdu_instance {} not found".format(kdu_instance))
788
789 # init env, paths
790 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100791 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000792 )
793
bravof7bd5c6a2021-11-17 11:14:57 -0300794 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100795 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300796
garciadeblas82b591c2021-03-24 09:22:13 +0100797 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300798 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100799 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000800
801 self.log.debug("rolling_back: {}".format(command))
802
803 # exec helm in a task
804 exec_task = asyncio.ensure_future(
805 coro_or_future=self._local_async_exec(
806 command=command, raise_exception_on_error=False, env=env
807 )
808 )
809 # write status in another task
810 status_task = asyncio.ensure_future(
811 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100812 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000813 kdu_instance=kdu_instance,
814 namespace=instance_info["namespace"],
815 db_dict=db_dict,
816 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000817 )
818 )
819
820 # wait for execution task
821 await asyncio.wait([exec_task])
822
823 # cancel status task
824 status_task.cancel()
825
826 output, rc = exec_task.result()
827
828 # write final status
829 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100830 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000831 kdu_instance=kdu_instance,
832 namespace=instance_info["namespace"],
833 db_dict=db_dict,
834 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000835 )
836
837 if rc != 0:
838 msg = "Error executing command: {}\nOutput: {}".format(command, output)
839 self.log.error(msg)
840 raise K8sException(msg)
841
842 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100843 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000844
845 # return new revision number
846 instance = await self.get_instance_info(
847 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
848 )
849 if instance:
850 revision = int(instance.get("revision"))
851 self.log.debug("New revision: {}".format(revision))
852 return revision
853 else:
854 return 0
855
David Garciaeb8943a2021-04-12 12:07:37 +0200856 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000857 """
858 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
859 (this call should happen after all _terminate-config-primitive_ of the VNF
860 are invoked).
861
862 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
863 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200864 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000865 :return: True if successful
866 """
867
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000868 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100869 "uninstall kdu_instance {} from cluster {}".format(
870 kdu_instance, cluster_uuid
871 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000872 )
873
874 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100875 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000876
877 # look for instance to obtain namespace
878 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
879 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200880 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
881 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000882 # init env, paths
883 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100884 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885 )
886
bravof7bd5c6a2021-11-17 11:14:57 -0300887 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100888 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300889
890 command = self._get_uninstall_command(
891 kdu_instance, instance_info["namespace"], paths["kube_config"]
892 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000893 output, _rc = await self._local_async_exec(
894 command=command, raise_exception_on_error=True, env=env
895 )
896
897 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100898 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000899
900 return self._output_to_table(output)
901
902 async def instances_list(self, cluster_uuid: str) -> list:
903 """
904 returns a list of deployed releases in a cluster
905
906 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
907 :return:
908 """
909
Pedro Escaleirab41de172022-04-02 00:44:08 +0100910 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000911
912 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100913 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000914
915 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100916 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000917
918 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100919 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000920
921 return result
922
923 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
924 instances = await self.instances_list(cluster_uuid=cluster_uuid)
925 for instance in instances:
926 if instance.get("name") == kdu_instance:
927 return instance
928 self.log.debug("Instance {} not found".format(kdu_instance))
929 return None
930
aticig8070c3c2022-04-18 00:31:42 +0300931 async def upgrade_charm(
932 self,
933 ee_id: str = None,
934 path: str = None,
935 charm_id: str = None,
936 charm_type: str = None,
937 timeout: float = None,
938 ) -> str:
939 """This method upgrade charms in VNFs
940
941 Args:
942 ee_id: Execution environment id
943 path: Local path to the charm
944 charm_id: charm-id
945 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
946 timeout: (Float) Timeout for the ns update operation
947
948 Returns:
949 The output of the update operation if status equals to "completed"
950 """
951 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
952
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000953 async def exec_primitive(
954 self,
955 cluster_uuid: str = None,
956 kdu_instance: str = None,
957 primitive_name: str = None,
958 timeout: float = 300,
959 params: dict = None,
960 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200961 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000962 ) -> str:
963 """Exec primitive (Juju action)
964
965 :param cluster_uuid: The UUID of the cluster or namespace:cluster
966 :param kdu_instance: The unique name of the KDU instance
967 :param primitive_name: Name of action that will be executed
968 :param timeout: Timeout for action execution
969 :param params: Dictionary of all the parameters needed for the action
970 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200971 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000972
973 :return: Returns the output of the action
974 """
975 raise K8sException(
976 "KDUs deployed with Helm don't support actions "
977 "different from rollback, upgrade and status"
978 )
979
garciadeblas82b591c2021-03-24 09:22:13 +0100980 async def get_services(
981 self, cluster_uuid: str, kdu_instance: str, namespace: str
982 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000983 """
984 Returns a list of services defined for the specified kdu instance.
985
986 :param cluster_uuid: UUID of a K8s cluster known by OSM
987 :param kdu_instance: unique name for the KDU instance
988 :param namespace: K8s namespace used by the KDU instance
989 :return: If successful, it will return a list of services, Each service
990 can have the following data:
991 - `name` of the service
992 - `type` type of service in the k8 cluster
993 - `ports` List of ports offered by the service, for each port includes at least
994 name, port, protocol
995 - `cluster_ip` Internal ip to be used inside k8s cluster
996 - `external_ip` List of external ips (in case they are available)
997 """
998
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000999 self.log.debug(
1000 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1001 cluster_uuid, kdu_instance
1002 )
1003 )
1004
bravof7bd5c6a2021-11-17 11:14:57 -03001005 # init env, paths
1006 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001007 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001008 )
1009
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001010 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001011 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001012
1013 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001014 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001015 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001016 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001017
1018 service_list = []
1019 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001020 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001021 service_list.append(service)
1022
1023 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001024 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001025
1026 return service_list
1027
garciadeblas82b591c2021-03-24 09:22:13 +01001028 async def get_service(
1029 self, cluster_uuid: str, service_name: str, namespace: str
1030 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001031
1032 self.log.debug(
1033 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001034 service_name, namespace, cluster_uuid
1035 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001036 )
1037
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001038 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001039 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001040
Pedro Escaleirab41de172022-04-02 00:44:08 +01001041 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001042
1043 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001044 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001045
1046 return service
1047
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001048 async def status_kdu(
1049 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1050 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001051 """
1052 This call would retrieve tha current state of a given KDU instance. It would be
1053 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1054 values_ of the configuration parameters applied to a given instance. This call
1055 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001056
David Garciaeb8943a2021-04-12 12:07:37 +02001057 :param cluster_uuid: UUID of a K8s cluster known by OSM
1058 :param kdu_instance: unique name for the KDU instance
1059 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001060 :param yaml_format: if the return shall be returned as an YAML string or as a
1061 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001062 :return: If successful, it will return the following vector of arguments:
1063 - K8s `namespace` in the cluster where the KDU lives
1064 - `state` of the KDU instance. It can be:
1065 - UNKNOWN
1066 - DEPLOYED
1067 - DELETED
1068 - SUPERSEDED
1069 - FAILED or
1070 - DELETING
1071 - List of `resources` (objects) that this release consists of, sorted by kind,
1072 and the status of those resources
1073 - Last `deployment_time`.
1074
1075 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001076 self.log.debug(
1077 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1078 cluster_uuid, kdu_instance
1079 )
1080 )
1081
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001082 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001083 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001084
1085 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001086 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001087 for instance in instances:
1088 if instance.get("name") == kdu_instance:
1089 break
1090 else:
1091 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001092 raise K8sException(
1093 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001094 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001095 )
1096 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001097
1098 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001099 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001100 kdu_instance=kdu_instance,
1101 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001102 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001103 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001104 )
1105
1106 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001107 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001108
1109 return status
1110
aktas867418c2021-10-19 18:26:13 +03001111 async def get_values_kdu(
1112 self, kdu_instance: str, namespace: str, kubeconfig: str
1113 ) -> str:
1114
1115 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1116
1117 return await self._exec_get_command(
1118 get_command="values",
1119 kdu_instance=kdu_instance,
1120 namespace=namespace,
1121 kubeconfig=kubeconfig,
1122 )
1123
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001124 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001125 """Method to obtain the Helm Chart package's values
1126
1127 Args:
1128 kdu_model: The name or path of an Helm Chart
1129 repo_url: Helm Chart repository url
1130
1131 Returns:
1132 str: the values of the Helm Chart package
1133 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001134
1135 self.log.debug(
1136 "inspect kdu_model values {} from (optional) repo: {}".format(
1137 kdu_model, repo_url
1138 )
1139 )
1140
aktas867418c2021-10-19 18:26:13 +03001141 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001142 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1143 )
1144
1145 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1146
1147 self.log.debug(
1148 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1149 )
1150
aktas867418c2021-10-19 18:26:13 +03001151 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001152 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1153 )
1154
1155 async def synchronize_repos(self, cluster_uuid: str):
1156
1157 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1158 try:
1159 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1160 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1161
1162 local_repo_list = await self.repo_list(cluster_uuid)
1163 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1164
1165 deleted_repo_list = []
1166 added_repo_dict = {}
1167
1168 # iterate over the list of repos in the database that should be
1169 # added if not present
1170 for repo_name, db_repo in db_repo_dict.items():
1171 try:
1172 # check if it is already present
1173 curr_repo_url = local_repo_dict.get(db_repo["name"])
1174 repo_id = db_repo.get("_id")
1175 if curr_repo_url != db_repo["url"]:
1176 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001177 self.log.debug(
1178 "repo {} url changed, delete and and again".format(
1179 db_repo["url"]
1180 )
1181 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001182 await self.repo_remove(cluster_uuid, db_repo["name"])
1183 deleted_repo_list.append(repo_id)
1184
1185 # add repo
1186 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001187 if "ca_cert" in db_repo:
1188 await self.repo_add(
1189 cluster_uuid,
1190 db_repo["name"],
1191 db_repo["url"],
1192 cert=db_repo["ca_cert"],
1193 )
1194 else:
1195 await self.repo_add(
1196 cluster_uuid,
1197 db_repo["name"],
1198 db_repo["url"],
1199 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001200 added_repo_dict[repo_id] = db_repo["name"]
1201 except Exception as e:
1202 raise K8sException(
1203 "Error adding repo id: {}, err_msg: {} ".format(
1204 repo_id, repr(e)
1205 )
1206 )
1207
1208 # Delete repos that are present but not in nbi_list
1209 for repo_name in local_repo_dict:
1210 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1211 self.log.debug("delete repo {}".format(repo_name))
1212 try:
1213 await self.repo_remove(cluster_uuid, repo_name)
1214 deleted_repo_list.append(repo_name)
1215 except Exception as e:
1216 self.warning(
1217 "Error deleting repo, name: {}, err_msg: {}".format(
1218 repo_name, str(e)
1219 )
1220 )
1221
1222 return deleted_repo_list, added_repo_dict
1223
1224 except K8sException:
1225 raise
1226 except Exception as e:
1227 # Do not raise errors synchronizing repos
1228 self.log.error("Error synchronizing repos: {}".format(e))
1229 raise Exception("Error synchronizing repos: {}".format(e))
1230
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001231 def _get_db_repos_dict(self, repo_ids: list):
1232 db_repos_dict = {}
1233 for repo_id in repo_ids:
1234 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1235 db_repos_dict[db_repo["name"]] = db_repo
1236 return db_repos_dict
1237
1238 """
1239 ####################################################################################
1240 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1241 ####################################################################################
1242 """
1243
1244 @abc.abstractmethod
1245 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1246 """
1247 Creates and returns base cluster and kube dirs and returns them.
1248 Also created helm3 dirs according to new directory specification, paths are
1249 not returned but assigned to helm environment variables
1250
1251 :param cluster_name: cluster_name
1252 :return: Dictionary with config_paths and dictionary with helm environment variables
1253 """
1254
1255 @abc.abstractmethod
1256 async def _cluster_init(self, cluster_id, namespace, paths, env):
1257 """
1258 Implements the helm version dependent cluster initialization
1259 """
1260
1261 @abc.abstractmethod
1262 async def _instances_list(self, cluster_id):
1263 """
1264 Implements the helm version dependent helm instances list
1265 """
1266
1267 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001268 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001269 """
1270 Implements the helm version dependent method to obtain services from a helm instance
1271 """
1272
1273 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001274 async def _status_kdu(
1275 self,
1276 cluster_id: str,
1277 kdu_instance: str,
1278 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001279 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001280 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001281 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001282 """
1283 Implements the helm version dependent method to obtain status of a helm instance
1284 """
1285
1286 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001287 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001288 self,
1289 kdu_model,
1290 kdu_instance,
1291 namespace,
1292 params_str,
1293 version,
1294 atomic,
1295 timeout,
1296 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001297 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001298 """
1299 Obtain command to be executed to delete the indicated instance
1300 """
1301
1302 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001303 def _get_upgrade_scale_command(
1304 self,
1305 kdu_model,
1306 kdu_instance,
1307 namespace,
1308 count,
1309 version,
1310 atomic,
1311 replicas,
1312 timeout,
1313 resource_name,
1314 kubeconfig,
1315 ) -> str:
Pedro Escaleira44bd0682022-07-07 22:18:35 +01001316 """Generates the command to scale a Helm Chart release
1317
1318 Args:
1319 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1320 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1321 namespace (str): Namespace where this KDU instance is deployed
1322 scale (int): Scale count
1323 version (str): Constraint with specific version of the Chart to use
1324 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1325 The --wait flag will be set automatically if --atomic is used
1326 replica_str (str): The key under resource_name key where the scale count is stored
1327 timeout (float): The time, in seconds, to wait
1328 resource_name (str): The KDU's resource to scale
1329 kubeconfig (str): Kubeconfig file path
1330
1331 Returns:
1332 str: command to scale a Helm Chart release
1333 """
aktas867418c2021-10-19 18:26:13 +03001334
1335 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001336 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001337 self,
1338 kdu_model,
1339 kdu_instance,
1340 namespace,
1341 params_str,
1342 version,
1343 atomic,
1344 timeout,
1345 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001346 ) -> str:
Pedro Escaleira44bd0682022-07-07 22:18:35 +01001347 """Generates the command to upgrade a Helm Chart release
1348
1349 Args:
1350 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1351 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1352 namespace (str): Namespace where this KDU instance is deployed
1353 params_str (str): Params used to upgrade the Helm Chart release
1354 version (str): Constraint with specific version of the Chart to use
1355 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1356 The --wait flag will be set automatically if --atomic is used
1357 timeout (float): The time, in seconds, to wait
1358 kubeconfig (str): Kubeconfig file path
1359
1360 Returns:
1361 str: command to upgrade a Helm Chart release
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001362 """
1363
1364 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001365 def _get_rollback_command(
1366 self, kdu_instance, namespace, revision, kubeconfig
1367 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001368 """
1369 Obtain command to be executed to rollback the indicated instance
1370 """
1371
1372 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001373 def _get_uninstall_command(
1374 self, kdu_instance: str, namespace: str, kubeconfig: str
1375 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001376 """
1377 Obtain command to be executed to delete the indicated instance
1378 """
1379
1380 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001381 def _get_inspect_command(
1382 self, show_command: str, kdu_model: str, repo_str: str, version: str
1383 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001384 """Generates the command to obtain the information about an Helm Chart package
1385 (´helm show ...´ command)
1386
1387 Args:
1388 show_command: the second part of the command (`helm show <show_command>`)
1389 kdu_model: The name or path of an Helm Chart
1390 repo_url: Helm Chart repository url
1391 version: constraint with specific version of the Chart to use
1392
1393 Returns:
1394 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001395 """
1396
1397 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001398 def _get_get_command(
1399 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1400 ):
1401 """Obtain command to be executed to get information about the kdu instance."""
1402
1403 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001404 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1405 """
1406 Method call to uninstall cluster software for helm. This method is dependent
1407 of helm version
1408 For Helm v2 it will be called when Tiller must be uninstalled
1409 For Helm v3 it does nothing and does not need to be callled
1410 """
1411
lloretgalleg095392b2020-11-20 11:28:08 +00001412 @abc.abstractmethod
1413 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1414 """
1415 Obtains the cluster repos identifiers
1416 """
1417
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001418 """
1419 ####################################################################################
1420 ################################### P R I V A T E ##################################
1421 ####################################################################################
1422 """
1423
1424 @staticmethod
1425 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1426 if os.path.exists(filename):
1427 return True
1428 else:
1429 msg = "File {} does not exist".format(filename)
1430 if exception_if_not_exists:
1431 raise K8sException(msg)
1432
1433 @staticmethod
1434 def _remove_multiple_spaces(strobj):
1435 strobj = strobj.strip()
1436 while " " in strobj:
1437 strobj = strobj.replace(" ", " ")
1438 return strobj
1439
1440 @staticmethod
1441 def _output_to_lines(output: str) -> list:
1442 output_lines = list()
1443 lines = output.splitlines(keepends=False)
1444 for line in lines:
1445 line = line.strip()
1446 if len(line) > 0:
1447 output_lines.append(line)
1448 return output_lines
1449
1450 @staticmethod
1451 def _output_to_table(output: str) -> list:
1452 output_table = list()
1453 lines = output.splitlines(keepends=False)
1454 for line in lines:
1455 line = line.replace("\t", " ")
1456 line_list = list()
1457 output_table.append(line_list)
1458 cells = line.split(sep=" ")
1459 for cell in cells:
1460 cell = cell.strip()
1461 if len(cell) > 0:
1462 line_list.append(cell)
1463 return output_table
1464
1465 @staticmethod
1466 def _parse_services(output: str) -> list:
1467 lines = output.splitlines(keepends=False)
1468 services = []
1469 for line in lines:
1470 line = line.replace("\t", " ")
1471 cells = line.split(sep=" ")
1472 if len(cells) > 0 and cells[0].startswith("service/"):
1473 elems = cells[0].split(sep="/")
1474 if len(elems) > 1:
1475 services.append(elems[1])
1476 return services
1477
1478 @staticmethod
1479 def _get_deep(dictionary: dict, members: tuple):
1480 target = dictionary
1481 value = None
1482 try:
1483 for m in members:
1484 value = target.get(m)
1485 if not value:
1486 return None
1487 else:
1488 target = value
1489 except Exception:
1490 pass
1491 return value
1492
1493 # find key:value in several lines
1494 @staticmethod
1495 def _find_in_lines(p_lines: list, p_key: str) -> str:
1496 for line in p_lines:
1497 try:
1498 if line.startswith(p_key + ":"):
1499 parts = line.split(":")
1500 the_value = parts[1].strip()
1501 return the_value
1502 except Exception:
1503 # ignore it
1504 pass
1505 return None
1506
1507 @staticmethod
1508 def _lower_keys_list(input_list: list):
1509 """
1510 Transform the keys in a list of dictionaries to lower case and returns a new list
1511 of dictionaries
1512 """
1513 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001514 if input_list:
1515 for dictionary in input_list:
1516 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1517 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001518 return new_list
1519
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001520 async def _local_async_exec(
1521 self,
1522 command: str,
1523 raise_exception_on_error: bool = False,
1524 show_error_log: bool = True,
1525 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001526 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001527 ) -> (str, int):
1528
1529 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001530 self.log.debug(
1531 "Executing async local command: {}, env: {}".format(command, env)
1532 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001533
1534 # split command
1535 command = shlex.split(command)
1536
1537 environ = os.environ.copy()
1538 if env:
1539 environ.update(env)
1540
1541 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001542 async with self.cmd_lock:
1543 process = await asyncio.create_subprocess_exec(
1544 *command,
1545 stdout=asyncio.subprocess.PIPE,
1546 stderr=asyncio.subprocess.PIPE,
1547 env=environ,
1548 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001549
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001550 # wait for command terminate
1551 stdout, stderr = await process.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001552
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001553 return_code = process.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001554
1555 output = ""
1556 if stdout:
1557 output = stdout.decode("utf-8").strip()
1558 # output = stdout.decode()
1559 if stderr:
1560 output = stderr.decode("utf-8").strip()
1561 # output = stderr.decode()
1562
1563 if return_code != 0 and show_error_log:
1564 self.log.debug(
1565 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1566 )
1567 else:
1568 self.log.debug("Return code: {}".format(return_code))
1569
1570 if raise_exception_on_error and return_code != 0:
1571 raise K8sException(output)
1572
1573 if encode_utf8:
1574 output = output.encode("utf-8").strip()
1575 output = str(output).replace("\\n", "\n")
1576
1577 return output, return_code
1578
1579 except asyncio.CancelledError:
1580 raise
1581 except K8sException:
1582 raise
1583 except Exception as e:
1584 msg = "Exception executing command: {} -> {}".format(command, e)
1585 self.log.error(msg)
1586 if raise_exception_on_error:
1587 raise K8sException(e) from e
1588 else:
1589 return "", -1
1590
garciadeblas82b591c2021-03-24 09:22:13 +01001591 async def _local_async_exec_pipe(
1592 self,
1593 command1: str,
1594 command2: str,
1595 raise_exception_on_error: bool = True,
1596 show_error_log: bool = True,
1597 encode_utf8: bool = False,
1598 env: dict = None,
1599 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001600
1601 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1602 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1603 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001604 self.log.debug(
1605 "Executing async local command: {}, env: {}".format(command, env)
1606 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001607
1608 # split command
1609 command1 = shlex.split(command1)
1610 command2 = shlex.split(command2)
1611
1612 environ = os.environ.copy()
1613 if env:
1614 environ.update(env)
1615
1616 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001617 async with self.cmd_lock:
1618 read, write = os.pipe()
1619 await asyncio.create_subprocess_exec(
1620 *command1, stdout=write, env=environ
1621 )
1622 os.close(write)
1623 process_2 = await asyncio.create_subprocess_exec(
1624 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1625 )
1626 os.close(read)
1627 stdout, stderr = await process_2.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001628
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001629 return_code = process_2.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001630
1631 output = ""
1632 if stdout:
1633 output = stdout.decode("utf-8").strip()
1634 # output = stdout.decode()
1635 if stderr:
1636 output = stderr.decode("utf-8").strip()
1637 # output = stderr.decode()
1638
1639 if return_code != 0 and show_error_log:
1640 self.log.debug(
1641 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1642 )
1643 else:
1644 self.log.debug("Return code: {}".format(return_code))
1645
1646 if raise_exception_on_error and return_code != 0:
1647 raise K8sException(output)
1648
1649 if encode_utf8:
1650 output = output.encode("utf-8").strip()
1651 output = str(output).replace("\\n", "\n")
1652
1653 return output, return_code
1654 except asyncio.CancelledError:
1655 raise
1656 except K8sException:
1657 raise
1658 except Exception as e:
1659 msg = "Exception executing command: {} -> {}".format(command, e)
1660 self.log.error(msg)
1661 if raise_exception_on_error:
1662 raise K8sException(e) from e
1663 else:
1664 return "", -1
1665
1666 async def _get_service(self, cluster_id, service_name, namespace):
1667 """
1668 Obtains the data of the specified service in the k8cluster.
1669
1670 :param cluster_id: id of a K8s cluster known by OSM
1671 :param service_name: name of the K8s service in the specified namespace
1672 :param namespace: K8s namespace used by the KDU instance
1673 :return: If successful, it will return a service with the following data:
1674 - `name` of the service
1675 - `type` type of service in the k8 cluster
1676 - `ports` List of ports offered by the service, for each port includes at least
1677 name, port, protocol
1678 - `cluster_ip` Internal ip to be used inside k8s cluster
1679 - `external_ip` List of external ips (in case they are available)
1680 """
1681
1682 # init config, env
1683 paths, env = self._init_paths_env(
1684 cluster_name=cluster_id, create_if_not_exist=True
1685 )
1686
1687 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1688 self.kubectl_command, paths["kube_config"], namespace, service_name
1689 )
1690
1691 output, _rc = await self._local_async_exec(
1692 command=command, raise_exception_on_error=True, env=env
1693 )
1694
1695 data = yaml.load(output, Loader=yaml.SafeLoader)
1696
1697 service = {
1698 "name": service_name,
1699 "type": self._get_deep(data, ("spec", "type")),
1700 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001701 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001702 }
1703 if service["type"] == "LoadBalancer":
1704 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1705 ip_list = [elem["ip"] for elem in ip_map_list]
1706 service["external_ip"] = ip_list
1707
1708 return service
1709
aktas867418c2021-10-19 18:26:13 +03001710 async def _exec_get_command(
1711 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1712 ):
1713 """Obtains information about the kdu instance."""
1714
1715 full_command = self._get_get_command(
1716 get_command, kdu_instance, namespace, kubeconfig
1717 )
1718
1719 output, _rc = await self._local_async_exec(command=full_command)
1720
1721 return output
1722
1723 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001724 self, inspect_command: str, kdu_model: str, repo_url: str = None
1725 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001726 """Obtains information about an Helm Chart package (´helm show´ command)
1727
1728 Args:
1729 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1730 kdu_model: The name or path of an Helm Chart
1731 repo_url: Helm Chart repository url
1732
1733 Returns:
1734 str: the requested info about the Helm Chart package
1735 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001736
1737 repo_str = ""
1738 if repo_url:
1739 repo_str = " --repo {}".format(repo_url)
1740
1741 idx = kdu_model.find("/")
1742 if idx >= 0:
1743 idx += 1
1744 kdu_model = kdu_model[idx:]
1745
aktas867418c2021-10-19 18:26:13 +03001746 kdu_model, version = self._split_version(kdu_model)
1747 if version:
1748 version_str = "--version {}".format(version)
1749 else:
1750 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001751
garciadeblas82b591c2021-03-24 09:22:13 +01001752 full_command = self._get_inspect_command(
aktas867418c2021-10-19 18:26:13 +03001753 inspect_command, kdu_model, repo_str, version_str
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001754 )
1755
aktas867418c2021-10-19 18:26:13 +03001756 output, _rc = await self._local_async_exec(command=full_command)
1757
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001758 return output
1759
aktas867418c2021-10-19 18:26:13 +03001760 async def _get_replica_count_url(
1761 self,
1762 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001763 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001764 resource_name: str = None,
1765 ):
1766 """Get the replica count value in the Helm Chart Values.
1767
1768 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001769 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001770 repo_url: Helm Chart repository url
1771 resource_name: Resource name
1772
1773 Returns:
1774 True if replicas, False replicaCount
1775 """
1776
1777 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001778 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1779 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001780 )
1781
1782 if not kdu_values:
1783 raise K8sException(
1784 "kdu_values not found for kdu_model {}".format(kdu_model)
1785 )
1786
1787 if resource_name:
1788 kdu_values = kdu_values.get(resource_name, None)
1789
1790 if not kdu_values:
1791 msg = "resource {} not found in the values in model {}".format(
1792 resource_name, kdu_model
1793 )
1794 self.log.error(msg)
1795 raise K8sException(msg)
1796
1797 duplicate_check = False
1798
1799 replica_str = ""
1800 replicas = None
1801
1802 if kdu_values.get("replicaCount", None):
1803 replicas = kdu_values["replicaCount"]
1804 replica_str = "replicaCount"
1805 elif kdu_values.get("replicas", None):
1806 duplicate_check = True
1807 replicas = kdu_values["replicas"]
1808 replica_str = "replicas"
1809 else:
1810 if resource_name:
1811 msg = (
1812 "replicaCount or replicas not found in the resource"
1813 "{} values in model {}. Cannot be scaled".format(
1814 resource_name, kdu_model
1815 )
1816 )
1817 else:
1818 msg = (
1819 "replicaCount or replicas not found in the values"
1820 "in model {}. Cannot be scaled".format(kdu_model)
1821 )
1822 self.log.error(msg)
1823 raise K8sException(msg)
1824
1825 # Control if replicas and replicaCount exists at the same time
1826 msg = "replicaCount and replicas are exists at the same time"
1827 if duplicate_check:
1828 if "replicaCount" in kdu_values:
1829 self.log.error(msg)
1830 raise K8sException(msg)
1831 else:
1832 if "replicas" in kdu_values:
1833 self.log.error(msg)
1834 raise K8sException(msg)
1835
1836 return replicas, replica_str
1837
1838 async def _get_replica_count_instance(
1839 self,
1840 kdu_instance: str,
1841 namespace: str,
1842 kubeconfig: str,
1843 resource_name: str = None,
1844 ):
1845 """Get the replica count value in the instance.
1846
1847 Args:
1848 kdu_instance: The name of the KDU instance
1849 namespace: KDU instance namespace
1850 kubeconfig:
1851 resource_name: Resource name
1852
1853 Returns:
1854 True if replicas, False replicaCount
1855 """
1856
1857 kdu_values = yaml.load(
1858 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1859 Loader=yaml.SafeLoader,
1860 )
1861
1862 replicas = None
1863
1864 if kdu_values:
1865 resource_values = (
1866 kdu_values.get(resource_name, None) if resource_name else None
1867 )
1868 replicas = (
1869 (
1870 resource_values.get("replicaCount", None)
1871 or resource_values.get("replicas", None)
1872 )
1873 if resource_values
1874 else (
1875 kdu_values.get("replicaCount", None)
1876 or kdu_values.get("replicas", None)
1877 )
1878 )
1879
1880 return replicas
1881
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001882 async def _store_status(
1883 self,
1884 cluster_id: str,
1885 operation: str,
1886 kdu_instance: str,
1887 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001888 db_dict: dict = None,
Pedro Escaleirab46f88d2022-04-23 19:55:45 +01001889 ) -> None:
1890 """
1891 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1892
1893 :param cluster_id (str): the cluster where the KDU instance is deployed
1894 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1895 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1896 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1897 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1898 values for the keys:
1899 - "collection": The Mongo DB collection to write to
1900 - "filter": The query filter to use in the update process
1901 - "path": The dot separated keys which targets the object to be updated
1902 Defaults to None.
1903 """
1904
1905 try:
1906 detailed_status = await self._status_kdu(
1907 cluster_id=cluster_id,
1908 kdu_instance=kdu_instance,
1909 yaml_format=False,
1910 namespace=namespace,
1911 )
1912
1913 status = detailed_status.get("info").get("description")
1914 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1915
1916 # write status to db
1917 result = await self.write_app_status_to_db(
1918 db_dict=db_dict,
1919 status=str(status),
1920 detailed_status=str(detailed_status),
1921 operation=operation,
1922 )
1923
1924 if not result:
1925 self.log.info("Error writing in database. Task exiting...")
1926
1927 except asyncio.CancelledError as e:
1928 self.log.warning(
1929 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1930 )
1931 except Exception as e:
1932 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001933
1934 # params for use in -f file
1935 # returns values file option and filename (in order to delete it at the end)
1936 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1937
1938 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001939 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001940
1941 def get_random_number():
1942 r = random.randrange(start=1, stop=99999999)
1943 s = str(r)
1944 while len(s) < 10:
1945 s = "0" + s
1946 return s
1947
1948 params2 = dict()
1949 for key in params:
1950 value = params.get(key)
1951 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001952 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001953 params2[key] = value
1954
1955 values_file = get_random_number() + ".yaml"
1956 with open(values_file, "w") as stream:
1957 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1958
1959 return "-f {}".format(values_file), values_file
1960
1961 return "", None
1962
1963 # params for use in --set option
1964 @staticmethod
1965 def _params_to_set_option(params: dict) -> str:
1966 params_str = ""
1967 if params and len(params) > 0:
1968 start = True
1969 for key in params:
1970 value = params.get(key, None)
1971 if value is not None:
1972 if start:
1973 params_str += "--set "
1974 start = False
1975 else:
1976 params_str += ","
1977 params_str += "{}={}".format(key, value)
1978 return params_str
1979
1980 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001981 def generate_kdu_instance_name(**kwargs):
1982 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001983 # check embeded chart (file or dir)
1984 if chart_name.startswith("/"):
1985 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001986 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001987 # check URL
1988 elif "://" in chart_name:
1989 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001990 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001991
1992 name = ""
1993 for c in chart_name:
1994 if c.isalpha() or c.isnumeric():
1995 name += c
1996 else:
1997 name += "-"
1998 if len(name) > 35:
1999 name = name[0:35]
2000
2001 # if does not start with alpha character, prefix 'a'
2002 if not name[0].isalpha():
2003 name = "a" + name
2004
2005 name += "-"
2006
2007 def get_random_number():
2008 r = random.randrange(start=1, stop=99999999)
2009 s = str(r)
2010 s = s.rjust(10, "0")
2011 return s
2012
2013 name = name + get_random_number()
2014 return name.lower()
aktas867418c2021-10-19 18:26:13 +03002015
2016 def _split_version(self, kdu_model: str) -> (str, str):
2017 version = None
garciadeblas04393192022-06-08 15:39:24 +02002018 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03002019 parts = kdu_model.split(sep=":")
2020 if len(parts) == 2:
2021 version = str(parts[1])
2022 kdu_model = parts[0]
2023 return kdu_model, version
2024
limon537d2932022-07-21 13:55:55 +02002025 def _split_repo(self, kdu_model: str) -> str:
garciadeblas7faf4ec2022-04-08 22:53:25 +02002026 repo_name = None
2027 idx = kdu_model.find("/")
2028 if idx >= 0:
2029 repo_name = kdu_model[:idx]
2030 return repo_name
2031
aktas867418c2021-10-19 18:26:13 +03002032 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01002033 """Obtain the Helm repository for an Helm Chart
2034
2035 Args:
2036 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2037 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2038
2039 Returns:
2040 str: the repository URL; if Helm Chart is a local one, the function returns None
2041 """
2042
aktas867418c2021-10-19 18:26:13 +03002043 repo_url = None
2044 idx = kdu_model.find("/")
2045 if idx >= 0:
2046 repo_name = kdu_model[:idx]
2047 # Find repository link
2048 local_repo_list = await self.repo_list(cluster_uuid)
2049 for repo in local_repo_list:
2050 repo_url = repo["url"] if repo["name"] == repo_name else None
2051 return repo_url