blob: 65f898cc41465fba0b3b96f7845466841f7b662b [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
Gabriel Cubafb03e902022-10-07 11:40:03 -050037from n2vc.kubectl import Kubectl
lloretgalleg1c83f2e2020-10-22 09:12:35 +000038
39
40class K8sHelmBaseConnector(K8sConnector):
41
42 """
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
46 """
garciadeblas82b591c2021-03-24 09:22:13 +010047
lloretgalleg1c83f2e2020-10-22 09:12:35 +000048 service_account = "osm"
49
50 def __init__(
51 self,
52 fs: object,
53 db: object,
54 kubectl_command: str = "/usr/bin/kubectl",
55 helm_command: str = "/usr/bin/helm",
56 log: object = None,
57 on_update_db=None,
58 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
David Garcia4395cfa2021-05-28 16:21:51 +020074 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000075 # random numbers for release name generation
76 random.seed(time.time())
77
78 # the file system
79 self.fs = fs
80
81 # exception if kubectl is not installed
82 self.kubectl_command = kubectl_command
83 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
84
85 # exception if helm is not installed
86 self._helm_command = helm_command
87 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
88
lloretgalleg83e55892020-12-17 12:42:11 +000089 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020090 self._stable_repo_url = self.config.get("stablerepourl")
91 if self._stable_repo_url == "None":
92 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000093
Pedro Escaleira1f222a92022-06-20 15:40:43 +010094 # Lock to avoid concurrent execution of helm commands
95 self.cmd_lock = asyncio.Lock()
96
Pedro Escaleirab41de172022-04-02 00:44:08 +010097 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000098 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010099 Obtains the namespace used by the cluster with the uuid passed by argument
100
101 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000102 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100103
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster = self.db.get_one(
106 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
107 )
108 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000109
110 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100111 self,
112 k8s_creds: str,
113 namespace: str = "kube-system",
114 reuse_cluster_uuid=None,
115 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000116 ) -> (str, bool):
117 """
118 It prepares a given K8s cluster environment to run Charts
119
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 '.kube/config'
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200125 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
129 """
130
131 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100132 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000133 else:
134 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000135
garciadeblas82b591c2021-03-24 09:22:13 +0100136 self.log.debug(
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
138 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000139
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_id, create_if_not_exist=True
142 )
143 mode = stat.S_IRUSR | stat.S_IWUSR
144 with open(paths["kube_config"], "w", mode) as f:
145 f.write(k8s_creds)
146 os.chmod(paths["kube_config"], 0o600)
147
148 # Code with initialization specific of helm version
149 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
150
151 # sync fs with local data
152 self.fs.reverse_sync(from_path=cluster_id)
153
154 self.log.info("Cluster {} initialized".format(cluster_id))
155
Pedro Escaleirab41de172022-04-02 00:44:08 +0100156 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000157
158 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300159 self,
160 cluster_uuid: str,
161 name: str,
162 url: str,
163 repo_type: str = "chart",
164 cert: str = None,
165 user: str = None,
166 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000167 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100168 self.log.debug(
169 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100170 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100171 )
172 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000174 # init_env
175 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100176 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000177 )
178
bravof7bd5c6a2021-11-17 11:14:57 -0300179 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100180 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300181
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000182 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300183 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300184 paths["kube_config"], self._helm_command, name, url
185 )
bravof0ab522f2021-11-23 19:33:18 -0300186
187 if cert:
188 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100189 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300190 )
191 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
192 with open(temp_cert_file, "w") as the_cert:
193 the_cert.write(cert)
194 command += " --ca-file {}".format(temp_cert_file)
195
196 if user:
197 command += " --username={}".format(user)
198
199 if password:
200 command += " --password={}".format(password)
201
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000202 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100203 await self._local_async_exec(
204 command=command, raise_exception_on_error=True, env=env
205 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000206
garciadeblasd4cee8c2022-05-04 10:57:36 +0200207 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200208 command = "env KUBECONFIG={} {} repo update {}".format(
209 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200210 )
211 self.log.debug("updating repo: {}".format(command))
212 await self._local_async_exec(
213 command=command, raise_exception_on_error=False, env=env
214 )
215
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000216 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100217 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000218
garciadeblas7faf4ec2022-04-08 22:53:25 +0200219 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
220 self.log.debug(
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid, repo_type, name
223 )
224 )
225
226 # init_env
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 # helm repo update
235 command = "{} repo update {}".format(self._helm_command, name)
236 self.log.debug("updating repo: {}".format(command))
237 await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_uuid)
243
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000244 async def repo_list(self, cluster_uuid: str) -> list:
245 """
246 Get the list of registered repositories
247
248 :return: list of registered repositories: [ (name, url) .... ]
249 """
250
Pedro Escaleirab41de172022-04-02 00:44:08 +0100251 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000253 # config filename
254 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100255 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000256 )
257
bravof7bd5c6a2021-11-17 11:14:57 -0300258 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100259 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300260
261 command = "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths["kube_config"], self._helm_command
263 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000264
265 # Set exception to false because if there are no repos just want an empty list
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=False, env=env
268 )
269
270 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100271 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000272
273 if _rc == 0:
274 if output and len(output) > 0:
275 repos = yaml.load(output, Loader=yaml.SafeLoader)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self._lower_keys_list(repos)
278 else:
279 return []
280 else:
281 return []
282
283 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100284 self.log.debug(
285 "remove {} repositories for cluster {}".format(name, cluster_uuid)
286 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000288 # init env, paths
289 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100290 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000291 )
292
bravof7bd5c6a2021-11-17 11:14:57 -0300293 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100294 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300295
296 command = "env KUBECONFIG={} {} repo remove {}".format(
297 paths["kube_config"], self._helm_command, name
298 )
garciadeblas82b591c2021-03-24 09:22:13 +0100299 await self._local_async_exec(
300 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000302
303 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100304 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000305
306 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100307 self,
308 cluster_uuid: str,
309 force: bool = False,
310 uninstall_sw: bool = False,
311 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000312 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200313 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000314
David Garciaeb8943a2021-04-12 12:07:37 +0200315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
322 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100323 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100324 self.log.debug(
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100326 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100327 )
328 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000329
330 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100331 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000332
333 # uninstall releases if needed.
334 if uninstall_sw:
335 releases = await self.instances_list(cluster_uuid=cluster_uuid)
336 if len(releases) > 0:
337 if force:
338 for r in releases:
339 try:
340 kdu_instance = r.get("name")
341 chart = r.get("chart")
342 self.log.debug(
343 "Uninstalling {} -> {}".format(chart, kdu_instance)
344 )
345 await self.uninstall(
346 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
347 )
348 except Exception as e:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
351 # raised an error
352 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100353 "Error uninstalling release {}: {}".format(
354 kdu_instance, e
355 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000356 )
357 else:
358 msg = (
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100360 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000361 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100362 uninstall_sw = (
363 False # Allow to remove k8s cluster without removing Tiller
364 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000365
366 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100367 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000368
369 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100370 self.log.debug("Removing directory {}".format(cluster_uuid))
371 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000372 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100373 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
garciadeblas04393192022-06-08 15:39:24 +0200378 def _is_helm_chart_a_file(self, chart_name: str):
379 return chart_name.count("/") > 1
380
lloretgalleg095392b2020-11-20 11:28:08 +0000381 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100382 self,
383 cluster_id: str,
384 kdu_model: str,
385 paths: dict,
386 env: dict,
387 kdu_instance: str,
388 atomic: bool = True,
389 timeout: float = 300,
390 params: dict = None,
391 db_dict: dict = None,
392 kdu_name: str = None,
393 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000394 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300395 # init env, paths
396 paths, env = self._init_paths_env(
397 cluster_name=cluster_id, create_if_not_exist=True
398 )
399
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000400 # params to str
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 # version
aktas867418c2021-10-19 18:26:13 +0300406 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000407
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100408 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200409 if repo:
limon3c443f52022-07-21 13:55:55 +0200410 await self.repo_update(cluster_id, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200411
garciadeblas82b591c2021-03-24 09:22:13 +0100412 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300413 kdu_model,
414 kdu_instance,
415 namespace,
416 params_str,
417 version,
418 atomic,
419 timeout,
420 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100421 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000422
423 self.log.debug("installing: {}".format(command))
424
425 if atomic:
426 # exec helm in a task
427 exec_task = asyncio.ensure_future(
428 coro_or_future=self._local_async_exec(
429 command=command, raise_exception_on_error=False, env=env
430 )
431 )
432
433 # write status in another task
434 status_task = asyncio.ensure_future(
435 coro_or_future=self._store_status(
436 cluster_id=cluster_id,
437 kdu_instance=kdu_instance,
438 namespace=namespace,
439 db_dict=db_dict,
440 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000441 )
442 )
443
444 # wait for execution task
445 await asyncio.wait([exec_task])
446
447 # cancel status task
448 status_task.cancel()
449
450 output, rc = exec_task.result()
451
452 else:
453
454 output, rc = await self._local_async_exec(
455 command=command, raise_exception_on_error=False, env=env
456 )
457
458 # remove temporal values yaml file
459 if file_to_delete:
460 os.remove(file_to_delete)
461
462 # write final status
463 await self._store_status(
464 cluster_id=cluster_id,
465 kdu_instance=kdu_instance,
466 namespace=namespace,
467 db_dict=db_dict,
468 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000469 )
470
471 if rc != 0:
472 msg = "Error executing command: {}\nOutput: {}".format(command, output)
473 self.log.error(msg)
474 raise K8sException(msg)
475
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000476 async def upgrade(
477 self,
478 cluster_uuid: str,
479 kdu_instance: str,
480 kdu_model: str = None,
481 atomic: bool = True,
482 timeout: float = 300,
483 params: dict = None,
484 db_dict: dict = None,
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500485 namespace: str = None,
486 force: bool = False,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000487 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100491 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000492
493 # look for instance to obtain namespace
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500494
495 # set namespace
496 if not namespace:
497 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
498 if not instance_info:
499 raise K8sException("kdu_instance {} not found".format(kdu_instance))
500 namespace = instance_info["namespace"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000501
502 # init env, paths
503 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100504 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000505 )
506
bravof7bd5c6a2021-11-17 11:14:57 -0300507 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100508 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300509
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510 # params to str
511 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100512 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000513 )
514
515 # version
aktas867418c2021-10-19 18:26:13 +0300516 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000517
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100518 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200519 if repo:
limon3c443f52022-07-21 13:55:55 +0200520 await self.repo_update(cluster_uuid, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200521
garciadeblas82b591c2021-03-24 09:22:13 +0100522 command = self._get_upgrade_command(
523 kdu_model,
524 kdu_instance,
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500525 namespace,
garciadeblas82b591c2021-03-24 09:22:13 +0100526 params_str,
527 version,
528 atomic,
529 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300530 paths["kube_config"],
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500531 force,
garciadeblas82b591c2021-03-24 09:22:13 +0100532 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000533
534 self.log.debug("upgrading: {}".format(command))
535
536 if atomic:
537
538 # exec helm in a task
539 exec_task = asyncio.ensure_future(
540 coro_or_future=self._local_async_exec(
541 command=command, raise_exception_on_error=False, env=env
542 )
543 )
544 # write status in another task
545 status_task = asyncio.ensure_future(
546 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100547 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000548 kdu_instance=kdu_instance,
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500549 namespace=namespace,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000550 db_dict=db_dict,
551 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000552 )
553 )
554
555 # wait for execution task
556 await asyncio.wait([exec_task])
557
558 # cancel status task
559 status_task.cancel()
560 output, rc = exec_task.result()
561
562 else:
563
564 output, rc = await self._local_async_exec(
565 command=command, raise_exception_on_error=False, env=env
566 )
567
568 # remove temporal values yaml file
569 if file_to_delete:
570 os.remove(file_to_delete)
571
572 # write final status
573 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100574 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000575 kdu_instance=kdu_instance,
Gabriel Cuba085fa8d2022-10-10 12:13:55 -0500576 namespace=namespace,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000577 db_dict=db_dict,
578 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000579 )
580
581 if rc != 0:
582 msg = "Error executing command: {}\nOutput: {}".format(command, output)
583 self.log.error(msg)
584 raise K8sException(msg)
585
586 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100587 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000588
589 # return new revision number
590 instance = await self.get_instance_info(
591 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
592 )
593 if instance:
594 revision = int(instance.get("revision"))
595 self.log.debug("New revision: {}".format(revision))
596 return revision
597 else:
598 return 0
599
aktas2962f3e2021-03-15 11:05:35 +0300600 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100601 self,
602 kdu_instance: str,
603 scale: int,
604 resource_name: str,
605 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300606 cluster_uuid: str = None,
607 kdu_model: str = None,
608 atomic: bool = True,
609 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100610 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300611 ):
aktas867418c2021-10-19 18:26:13 +0300612 """Scale a resource in a Helm Chart.
613
614 Args:
615 kdu_instance: KDU instance name
616 scale: Scale to which to set the resource
617 resource_name: Resource name
618 total_timeout: The time, in seconds, to wait
619 cluster_uuid: The UUID of the cluster
620 kdu_model: The chart reference
621 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
622 The --wait flag will be set automatically if --atomic is used
623 db_dict: Dictionary for any additional data
624 kwargs: Additional parameters
625
626 Returns:
627 True if successful, False otherwise
628 """
629
Pedro Escaleirab41de172022-04-02 00:44:08 +0100630 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300631 if resource_name:
632 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100633 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300634 )
635
636 self.log.debug(debug_mgs)
637
638 # look for instance to obtain namespace
639 # get_instance_info function calls the sync command
640 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
641 if not instance_info:
642 raise K8sException("kdu_instance {} not found".format(kdu_instance))
643
644 # init env, paths
645 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100646 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300647 )
648
649 # version
650 kdu_model, version = self._split_version(kdu_model)
651
652 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300653
654 _, replica_str = await self._get_replica_count_url(
655 kdu_model, repo_url, resource_name
656 )
657
658 command = self._get_upgrade_scale_command(
659 kdu_model,
660 kdu_instance,
661 instance_info["namespace"],
662 scale,
663 version,
664 atomic,
665 replica_str,
666 total_timeout,
667 resource_name,
668 paths["kube_config"],
669 )
670
671 self.log.debug("scaling: {}".format(command))
672
673 if atomic:
674 # exec helm in a task
675 exec_task = asyncio.ensure_future(
676 coro_or_future=self._local_async_exec(
677 command=command, raise_exception_on_error=False, env=env
678 )
679 )
680 # write status in another task
681 status_task = asyncio.ensure_future(
682 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100683 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300684 kdu_instance=kdu_instance,
685 namespace=instance_info["namespace"],
686 db_dict=db_dict,
687 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300688 )
689 )
690
691 # wait for execution task
692 await asyncio.wait([exec_task])
693
694 # cancel status task
695 status_task.cancel()
696 output, rc = exec_task.result()
697
698 else:
699 output, rc = await self._local_async_exec(
700 command=command, raise_exception_on_error=False, env=env
701 )
702
703 # write final status
704 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100705 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300706 kdu_instance=kdu_instance,
707 namespace=instance_info["namespace"],
708 db_dict=db_dict,
709 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300710 )
711
712 if rc != 0:
713 msg = "Error executing command: {}\nOutput: {}".format(command, output)
714 self.log.error(msg)
715 raise K8sException(msg)
716
717 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100718 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300719
720 return True
aktas2962f3e2021-03-15 11:05:35 +0300721
722 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100723 self,
724 resource_name: str,
725 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300726 cluster_uuid: str,
727 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100728 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300729 ) -> int:
730 """Get a resource scale count.
731
732 Args:
733 cluster_uuid: The UUID of the cluster
734 resource_name: Resource name
735 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100736 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300737 kwargs: Additional parameters
738
739 Returns:
740 Resource instance count
741 """
742
aktas867418c2021-10-19 18:26:13 +0300743 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100744 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300745 )
746
747 # look for instance to obtain namespace
748 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
749 if not instance_info:
750 raise K8sException("kdu_instance {} not found".format(kdu_instance))
751
752 # init env, paths
Pedro Escaleira06313992022-06-04 22:21:57 +0100753 paths, _ = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100754 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300755 )
756
757 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100758 kdu_instance=kdu_instance,
759 namespace=instance_info["namespace"],
760 kubeconfig=paths["kube_config"],
761 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300762 )
763
Pedro Escaleira06313992022-06-04 22:21:57 +0100764 self.log.debug(
765 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
766 )
767
aktas867418c2021-10-19 18:26:13 +0300768 # Get default value if scale count is not found from provided values
Pedro Escaleira06313992022-06-04 22:21:57 +0100769 # Important note: this piece of code shall only be executed in the first scaling operation,
770 # since it is expected that the _get_replica_count_instance is able to obtain the number of
771 # replicas when a scale operation was already conducted previously for this KDU/resource!
772 if replicas is None:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100773 repo_url = await self._find_repo(
774 kdu_model=kdu_model, cluster_uuid=cluster_uuid
775 )
aktas867418c2021-10-19 18:26:13 +0300776 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100777 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300778 )
779
Pedro Escaleira06313992022-06-04 22:21:57 +0100780 self.log.debug(
781 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
782 f"{resource_name} obtained: {replicas}"
783 )
784
785 if replicas is None:
786 msg = "Replica count not found. Cannot be scaled"
787 self.log.error(msg)
788 raise K8sException(msg)
aktas867418c2021-10-19 18:26:13 +0300789
790 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300791
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000792 async def rollback(
793 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
794 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000795 self.log.debug(
796 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100797 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000798 )
799 )
800
801 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100802 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000803
804 # look for instance to obtain namespace
805 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
806 if not instance_info:
807 raise K8sException("kdu_instance {} not found".format(kdu_instance))
808
809 # init env, paths
810 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100811 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000812 )
813
bravof7bd5c6a2021-11-17 11:14:57 -0300814 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100815 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300816
garciadeblas82b591c2021-03-24 09:22:13 +0100817 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300818 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100819 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000820
821 self.log.debug("rolling_back: {}".format(command))
822
823 # exec helm in a task
824 exec_task = asyncio.ensure_future(
825 coro_or_future=self._local_async_exec(
826 command=command, raise_exception_on_error=False, env=env
827 )
828 )
829 # write status in another task
830 status_task = asyncio.ensure_future(
831 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100832 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000833 kdu_instance=kdu_instance,
834 namespace=instance_info["namespace"],
835 db_dict=db_dict,
836 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000837 )
838 )
839
840 # wait for execution task
841 await asyncio.wait([exec_task])
842
843 # cancel status task
844 status_task.cancel()
845
846 output, rc = exec_task.result()
847
848 # write final status
849 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100850 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000851 kdu_instance=kdu_instance,
852 namespace=instance_info["namespace"],
853 db_dict=db_dict,
854 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000855 )
856
857 if rc != 0:
858 msg = "Error executing command: {}\nOutput: {}".format(command, output)
859 self.log.error(msg)
860 raise K8sException(msg)
861
862 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100863 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000864
865 # return new revision number
866 instance = await self.get_instance_info(
867 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
868 )
869 if instance:
870 revision = int(instance.get("revision"))
871 self.log.debug("New revision: {}".format(revision))
872 return revision
873 else:
874 return 0
875
David Garciaeb8943a2021-04-12 12:07:37 +0200876 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000877 """
878 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
879 (this call should happen after all _terminate-config-primitive_ of the VNF
880 are invoked).
881
882 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
883 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200884 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885 :return: True if successful
886 """
887
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000888 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100889 "uninstall kdu_instance {} from cluster {}".format(
890 kdu_instance, cluster_uuid
891 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000892 )
893
894 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100895 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000896
897 # look for instance to obtain namespace
898 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
899 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200900 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
901 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000902 # init env, paths
903 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100904 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000905 )
906
bravof7bd5c6a2021-11-17 11:14:57 -0300907 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100908 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300909
910 command = self._get_uninstall_command(
911 kdu_instance, instance_info["namespace"], paths["kube_config"]
912 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000913 output, _rc = await self._local_async_exec(
914 command=command, raise_exception_on_error=True, env=env
915 )
916
917 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100918 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000919
920 return self._output_to_table(output)
921
922 async def instances_list(self, cluster_uuid: str) -> list:
923 """
924 returns a list of deployed releases in a cluster
925
926 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
927 :return:
928 """
929
Pedro Escaleirab41de172022-04-02 00:44:08 +0100930 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000931
932 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100933 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000934
935 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100936 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000937
938 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100939 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000940
941 return result
942
943 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
944 instances = await self.instances_list(cluster_uuid=cluster_uuid)
945 for instance in instances:
946 if instance.get("name") == kdu_instance:
947 return instance
948 self.log.debug("Instance {} not found".format(kdu_instance))
949 return None
950
aticig8070c3c2022-04-18 00:31:42 +0300951 async def upgrade_charm(
952 self,
953 ee_id: str = None,
954 path: str = None,
955 charm_id: str = None,
956 charm_type: str = None,
957 timeout: float = None,
958 ) -> str:
959 """This method upgrade charms in VNFs
960
961 Args:
962 ee_id: Execution environment id
963 path: Local path to the charm
964 charm_id: charm-id
965 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
966 timeout: (Float) Timeout for the ns update operation
967
968 Returns:
969 The output of the update operation if status equals to "completed"
970 """
971 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
972
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000973 async def exec_primitive(
974 self,
975 cluster_uuid: str = None,
976 kdu_instance: str = None,
977 primitive_name: str = None,
978 timeout: float = 300,
979 params: dict = None,
980 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200981 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000982 ) -> str:
983 """Exec primitive (Juju action)
984
985 :param cluster_uuid: The UUID of the cluster or namespace:cluster
986 :param kdu_instance: The unique name of the KDU instance
987 :param primitive_name: Name of action that will be executed
988 :param timeout: Timeout for action execution
989 :param params: Dictionary of all the parameters needed for the action
990 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200991 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000992
993 :return: Returns the output of the action
994 """
995 raise K8sException(
996 "KDUs deployed with Helm don't support actions "
997 "different from rollback, upgrade and status"
998 )
999
garciadeblas82b591c2021-03-24 09:22:13 +01001000 async def get_services(
1001 self, cluster_uuid: str, kdu_instance: str, namespace: str
1002 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001003 """
1004 Returns a list of services defined for the specified kdu instance.
1005
1006 :param cluster_uuid: UUID of a K8s cluster known by OSM
1007 :param kdu_instance: unique name for the KDU instance
1008 :param namespace: K8s namespace used by the KDU instance
1009 :return: If successful, it will return a list of services, Each service
1010 can have the following data:
1011 - `name` of the service
1012 - `type` type of service in the k8 cluster
1013 - `ports` List of ports offered by the service, for each port includes at least
1014 name, port, protocol
1015 - `cluster_ip` Internal ip to be used inside k8s cluster
1016 - `external_ip` List of external ips (in case they are available)
1017 """
1018
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001019 self.log.debug(
1020 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1021 cluster_uuid, kdu_instance
1022 )
1023 )
1024
bravof7bd5c6a2021-11-17 11:14:57 -03001025 # init env, paths
1026 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001027 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001028 )
1029
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001030 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001031 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001032
1033 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001034 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001035 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001036 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001037
1038 service_list = []
1039 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001040 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001041 service_list.append(service)
1042
1043 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001044 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001045
1046 return service_list
1047
garciadeblas82b591c2021-03-24 09:22:13 +01001048 async def get_service(
1049 self, cluster_uuid: str, service_name: str, namespace: str
1050 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001051
1052 self.log.debug(
1053 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001054 service_name, namespace, cluster_uuid
1055 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001056 )
1057
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001058 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001059 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001060
Pedro Escaleirab41de172022-04-02 00:44:08 +01001061 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001062
1063 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001064 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001065
1066 return service
1067
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001068 async def status_kdu(
1069 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1070 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001071 """
1072 This call would retrieve tha current state of a given KDU instance. It would be
1073 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1074 values_ of the configuration parameters applied to a given instance. This call
1075 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001076
David Garciaeb8943a2021-04-12 12:07:37 +02001077 :param cluster_uuid: UUID of a K8s cluster known by OSM
1078 :param kdu_instance: unique name for the KDU instance
1079 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001080 :param yaml_format: if the return shall be returned as an YAML string or as a
1081 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001082 :return: If successful, it will return the following vector of arguments:
1083 - K8s `namespace` in the cluster where the KDU lives
1084 - `state` of the KDU instance. It can be:
1085 - UNKNOWN
1086 - DEPLOYED
1087 - DELETED
1088 - SUPERSEDED
1089 - FAILED or
1090 - DELETING
1091 - List of `resources` (objects) that this release consists of, sorted by kind,
1092 and the status of those resources
1093 - Last `deployment_time`.
1094
1095 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001096 self.log.debug(
1097 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1098 cluster_uuid, kdu_instance
1099 )
1100 )
1101
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001102 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001103 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001104
1105 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001106 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001107 for instance in instances:
1108 if instance.get("name") == kdu_instance:
1109 break
1110 else:
1111 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001112 raise K8sException(
1113 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001114 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001115 )
1116 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001117
1118 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001119 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001120 kdu_instance=kdu_instance,
1121 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001122 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001123 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001124 )
1125
1126 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001127 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001128
1129 return status
1130
aktas867418c2021-10-19 18:26:13 +03001131 async def get_values_kdu(
1132 self, kdu_instance: str, namespace: str, kubeconfig: str
1133 ) -> str:
1134
1135 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1136
1137 return await self._exec_get_command(
1138 get_command="values",
1139 kdu_instance=kdu_instance,
1140 namespace=namespace,
1141 kubeconfig=kubeconfig,
1142 )
1143
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001144 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001145 """Method to obtain the Helm Chart package's values
1146
1147 Args:
1148 kdu_model: The name or path of an Helm Chart
1149 repo_url: Helm Chart repository url
1150
1151 Returns:
1152 str: the values of the Helm Chart package
1153 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001154
1155 self.log.debug(
1156 "inspect kdu_model values {} from (optional) repo: {}".format(
1157 kdu_model, repo_url
1158 )
1159 )
1160
aktas867418c2021-10-19 18:26:13 +03001161 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001162 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1163 )
1164
1165 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1166
1167 self.log.debug(
1168 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1169 )
1170
aktas867418c2021-10-19 18:26:13 +03001171 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001172 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1173 )
1174
1175 async def synchronize_repos(self, cluster_uuid: str):
1176
1177 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1178 try:
1179 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1180 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1181
1182 local_repo_list = await self.repo_list(cluster_uuid)
1183 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1184
1185 deleted_repo_list = []
1186 added_repo_dict = {}
1187
1188 # iterate over the list of repos in the database that should be
1189 # added if not present
1190 for repo_name, db_repo in db_repo_dict.items():
1191 try:
1192 # check if it is already present
1193 curr_repo_url = local_repo_dict.get(db_repo["name"])
1194 repo_id = db_repo.get("_id")
1195 if curr_repo_url != db_repo["url"]:
1196 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001197 self.log.debug(
1198 "repo {} url changed, delete and and again".format(
1199 db_repo["url"]
1200 )
1201 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001202 await self.repo_remove(cluster_uuid, db_repo["name"])
1203 deleted_repo_list.append(repo_id)
1204
1205 # add repo
1206 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001207 if "ca_cert" in db_repo:
1208 await self.repo_add(
1209 cluster_uuid,
1210 db_repo["name"],
1211 db_repo["url"],
1212 cert=db_repo["ca_cert"],
1213 )
1214 else:
1215 await self.repo_add(
1216 cluster_uuid,
1217 db_repo["name"],
1218 db_repo["url"],
1219 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001220 added_repo_dict[repo_id] = db_repo["name"]
1221 except Exception as e:
1222 raise K8sException(
1223 "Error adding repo id: {}, err_msg: {} ".format(
1224 repo_id, repr(e)
1225 )
1226 )
1227
1228 # Delete repos that are present but not in nbi_list
1229 for repo_name in local_repo_dict:
1230 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1231 self.log.debug("delete repo {}".format(repo_name))
1232 try:
1233 await self.repo_remove(cluster_uuid, repo_name)
1234 deleted_repo_list.append(repo_name)
1235 except Exception as e:
1236 self.warning(
1237 "Error deleting repo, name: {}, err_msg: {}".format(
1238 repo_name, str(e)
1239 )
1240 )
1241
1242 return deleted_repo_list, added_repo_dict
1243
1244 except K8sException:
1245 raise
1246 except Exception as e:
1247 # Do not raise errors synchronizing repos
1248 self.log.error("Error synchronizing repos: {}".format(e))
1249 raise Exception("Error synchronizing repos: {}".format(e))
1250
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001251 def _get_db_repos_dict(self, repo_ids: list):
1252 db_repos_dict = {}
1253 for repo_id in repo_ids:
1254 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1255 db_repos_dict[db_repo["name"]] = db_repo
1256 return db_repos_dict
1257
1258 """
1259 ####################################################################################
1260 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1261 ####################################################################################
1262 """
1263
1264 @abc.abstractmethod
1265 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1266 """
1267 Creates and returns base cluster and kube dirs and returns them.
1268 Also created helm3 dirs according to new directory specification, paths are
1269 not returned but assigned to helm environment variables
1270
1271 :param cluster_name: cluster_name
1272 :return: Dictionary with config_paths and dictionary with helm environment variables
1273 """
1274
1275 @abc.abstractmethod
1276 async def _cluster_init(self, cluster_id, namespace, paths, env):
1277 """
1278 Implements the helm version dependent cluster initialization
1279 """
1280
1281 @abc.abstractmethod
1282 async def _instances_list(self, cluster_id):
1283 """
1284 Implements the helm version dependent helm instances list
1285 """
1286
1287 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001288 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001289 """
1290 Implements the helm version dependent method to obtain services from a helm instance
1291 """
1292
1293 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001294 async def _status_kdu(
1295 self,
1296 cluster_id: str,
1297 kdu_instance: str,
1298 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001299 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001300 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001301 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001302 """
1303 Implements the helm version dependent method to obtain status of a helm instance
1304 """
1305
1306 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001307 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001308 self,
1309 kdu_model,
1310 kdu_instance,
1311 namespace,
1312 params_str,
1313 version,
1314 atomic,
1315 timeout,
1316 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001317 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001318 """
1319 Obtain command to be executed to delete the indicated instance
1320 """
1321
1322 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001323 def _get_upgrade_scale_command(
1324 self,
1325 kdu_model,
1326 kdu_instance,
1327 namespace,
1328 count,
1329 version,
1330 atomic,
1331 replicas,
1332 timeout,
1333 resource_name,
1334 kubeconfig,
1335 ) -> str:
Pedro Escaleira0a2060c2022-07-07 22:18:35 +01001336 """Generates the command to scale a Helm Chart release
1337
1338 Args:
1339 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1340 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1341 namespace (str): Namespace where this KDU instance is deployed
1342 scale (int): Scale count
1343 version (str): Constraint with specific version of the Chart to use
1344 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1345 The --wait flag will be set automatically if --atomic is used
1346 replica_str (str): The key under resource_name key where the scale count is stored
1347 timeout (float): The time, in seconds, to wait
1348 resource_name (str): The KDU's resource to scale
1349 kubeconfig (str): Kubeconfig file path
1350
1351 Returns:
1352 str: command to scale a Helm Chart release
1353 """
aktas867418c2021-10-19 18:26:13 +03001354
1355 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001356 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001357 self,
1358 kdu_model,
1359 kdu_instance,
1360 namespace,
1361 params_str,
1362 version,
1363 atomic,
1364 timeout,
1365 kubeconfig,
Gabriel Cuba085fa8d2022-10-10 12:13:55 -05001366 force,
garciadeblas82b591c2021-03-24 09:22:13 +01001367 ) -> str:
Pedro Escaleira0a2060c2022-07-07 22:18:35 +01001368 """Generates the command to upgrade a Helm Chart release
1369
1370 Args:
1371 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1372 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1373 namespace (str): Namespace where this KDU instance is deployed
1374 params_str (str): Params used to upgrade the Helm Chart release
1375 version (str): Constraint with specific version of the Chart to use
1376 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1377 The --wait flag will be set automatically if --atomic is used
1378 timeout (float): The time, in seconds, to wait
1379 kubeconfig (str): Kubeconfig file path
Gabriel Cuba085fa8d2022-10-10 12:13:55 -05001380 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
Pedro Escaleira0a2060c2022-07-07 22:18:35 +01001381 Returns:
1382 str: command to upgrade a Helm Chart release
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001383 """
1384
1385 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001386 def _get_rollback_command(
1387 self, kdu_instance, namespace, revision, kubeconfig
1388 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001389 """
1390 Obtain command to be executed to rollback the indicated instance
1391 """
1392
1393 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001394 def _get_uninstall_command(
1395 self, kdu_instance: str, namespace: str, kubeconfig: str
1396 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001397 """
1398 Obtain command to be executed to delete the indicated instance
1399 """
1400
1401 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001402 def _get_inspect_command(
1403 self, show_command: str, kdu_model: str, repo_str: str, version: str
1404 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001405 """Generates the command to obtain the information about an Helm Chart package
1406 (´helm show ...´ command)
1407
1408 Args:
1409 show_command: the second part of the command (`helm show <show_command>`)
1410 kdu_model: The name or path of an Helm Chart
1411 repo_url: Helm Chart repository url
1412 version: constraint with specific version of the Chart to use
1413
1414 Returns:
1415 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001416 """
1417
1418 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001419 def _get_get_command(
1420 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1421 ):
1422 """Obtain command to be executed to get information about the kdu instance."""
1423
1424 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001425 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1426 """
1427 Method call to uninstall cluster software for helm. This method is dependent
1428 of helm version
1429 For Helm v2 it will be called when Tiller must be uninstalled
1430 For Helm v3 it does nothing and does not need to be callled
1431 """
1432
lloretgalleg095392b2020-11-20 11:28:08 +00001433 @abc.abstractmethod
1434 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1435 """
1436 Obtains the cluster repos identifiers
1437 """
1438
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001439 """
1440 ####################################################################################
1441 ################################### P R I V A T E ##################################
1442 ####################################################################################
1443 """
1444
1445 @staticmethod
1446 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1447 if os.path.exists(filename):
1448 return True
1449 else:
1450 msg = "File {} does not exist".format(filename)
1451 if exception_if_not_exists:
1452 raise K8sException(msg)
1453
1454 @staticmethod
1455 def _remove_multiple_spaces(strobj):
1456 strobj = strobj.strip()
1457 while " " in strobj:
1458 strobj = strobj.replace(" ", " ")
1459 return strobj
1460
1461 @staticmethod
1462 def _output_to_lines(output: str) -> list:
1463 output_lines = list()
1464 lines = output.splitlines(keepends=False)
1465 for line in lines:
1466 line = line.strip()
1467 if len(line) > 0:
1468 output_lines.append(line)
1469 return output_lines
1470
1471 @staticmethod
1472 def _output_to_table(output: str) -> list:
1473 output_table = list()
1474 lines = output.splitlines(keepends=False)
1475 for line in lines:
1476 line = line.replace("\t", " ")
1477 line_list = list()
1478 output_table.append(line_list)
1479 cells = line.split(sep=" ")
1480 for cell in cells:
1481 cell = cell.strip()
1482 if len(cell) > 0:
1483 line_list.append(cell)
1484 return output_table
1485
1486 @staticmethod
1487 def _parse_services(output: str) -> list:
1488 lines = output.splitlines(keepends=False)
1489 services = []
1490 for line in lines:
1491 line = line.replace("\t", " ")
1492 cells = line.split(sep=" ")
1493 if len(cells) > 0 and cells[0].startswith("service/"):
1494 elems = cells[0].split(sep="/")
1495 if len(elems) > 1:
1496 services.append(elems[1])
1497 return services
1498
1499 @staticmethod
1500 def _get_deep(dictionary: dict, members: tuple):
1501 target = dictionary
1502 value = None
1503 try:
1504 for m in members:
1505 value = target.get(m)
1506 if not value:
1507 return None
1508 else:
1509 target = value
1510 except Exception:
1511 pass
1512 return value
1513
1514 # find key:value in several lines
1515 @staticmethod
1516 def _find_in_lines(p_lines: list, p_key: str) -> str:
1517 for line in p_lines:
1518 try:
1519 if line.startswith(p_key + ":"):
1520 parts = line.split(":")
1521 the_value = parts[1].strip()
1522 return the_value
1523 except Exception:
1524 # ignore it
1525 pass
1526 return None
1527
1528 @staticmethod
1529 def _lower_keys_list(input_list: list):
1530 """
1531 Transform the keys in a list of dictionaries to lower case and returns a new list
1532 of dictionaries
1533 """
1534 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001535 if input_list:
1536 for dictionary in input_list:
1537 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1538 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001539 return new_list
1540
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001541 async def _local_async_exec(
1542 self,
1543 command: str,
1544 raise_exception_on_error: bool = False,
1545 show_error_log: bool = True,
1546 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001547 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001548 ) -> (str, int):
1549
1550 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001551 self.log.debug(
1552 "Executing async local command: {}, env: {}".format(command, env)
1553 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001554
1555 # split command
1556 command = shlex.split(command)
1557
1558 environ = os.environ.copy()
1559 if env:
1560 environ.update(env)
1561
1562 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001563 async with self.cmd_lock:
1564 process = await asyncio.create_subprocess_exec(
1565 *command,
1566 stdout=asyncio.subprocess.PIPE,
1567 stderr=asyncio.subprocess.PIPE,
1568 env=environ,
1569 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001570
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001571 # wait for command terminate
1572 stdout, stderr = await process.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001573
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001574 return_code = process.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001575
1576 output = ""
1577 if stdout:
1578 output = stdout.decode("utf-8").strip()
1579 # output = stdout.decode()
1580 if stderr:
1581 output = stderr.decode("utf-8").strip()
1582 # output = stderr.decode()
1583
1584 if return_code != 0 and show_error_log:
1585 self.log.debug(
1586 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1587 )
1588 else:
1589 self.log.debug("Return code: {}".format(return_code))
1590
1591 if raise_exception_on_error and return_code != 0:
1592 raise K8sException(output)
1593
1594 if encode_utf8:
1595 output = output.encode("utf-8").strip()
1596 output = str(output).replace("\\n", "\n")
1597
1598 return output, return_code
1599
1600 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001601 # first, kill the process if it is still running
1602 if process.returncode is None:
1603 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001604 raise
1605 except K8sException:
1606 raise
1607 except Exception as e:
1608 msg = "Exception executing command: {} -> {}".format(command, e)
1609 self.log.error(msg)
1610 if raise_exception_on_error:
1611 raise K8sException(e) from e
1612 else:
1613 return "", -1
1614
garciadeblas82b591c2021-03-24 09:22:13 +01001615 async def _local_async_exec_pipe(
1616 self,
1617 command1: str,
1618 command2: str,
1619 raise_exception_on_error: bool = True,
1620 show_error_log: bool = True,
1621 encode_utf8: bool = False,
1622 env: dict = None,
1623 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001624
1625 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1626 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1627 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001628 self.log.debug(
1629 "Executing async local command: {}, env: {}".format(command, env)
1630 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001631
1632 # split command
1633 command1 = shlex.split(command1)
1634 command2 = shlex.split(command2)
1635
1636 environ = os.environ.copy()
1637 if env:
1638 environ.update(env)
1639
1640 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001641 async with self.cmd_lock:
1642 read, write = os.pipe()
Pedro Escaleirad3817992022-07-23 23:34:42 +01001643 process_1 = await asyncio.create_subprocess_exec(
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001644 *command1, stdout=write, env=environ
1645 )
1646 os.close(write)
1647 process_2 = await asyncio.create_subprocess_exec(
1648 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1649 )
1650 os.close(read)
1651 stdout, stderr = await process_2.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001652
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001653 return_code = process_2.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001654
1655 output = ""
1656 if stdout:
1657 output = stdout.decode("utf-8").strip()
1658 # output = stdout.decode()
1659 if stderr:
1660 output = stderr.decode("utf-8").strip()
1661 # output = stderr.decode()
1662
1663 if return_code != 0 and show_error_log:
1664 self.log.debug(
1665 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1666 )
1667 else:
1668 self.log.debug("Return code: {}".format(return_code))
1669
1670 if raise_exception_on_error and return_code != 0:
1671 raise K8sException(output)
1672
1673 if encode_utf8:
1674 output = output.encode("utf-8").strip()
1675 output = str(output).replace("\\n", "\n")
1676
1677 return output, return_code
1678 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001679 # first, kill the processes if they are still running
1680 for process in (process_1, process_2):
1681 if process.returncode is None:
1682 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001683 raise
1684 except K8sException:
1685 raise
1686 except Exception as e:
1687 msg = "Exception executing command: {} -> {}".format(command, e)
1688 self.log.error(msg)
1689 if raise_exception_on_error:
1690 raise K8sException(e) from e
1691 else:
1692 return "", -1
1693
1694 async def _get_service(self, cluster_id, service_name, namespace):
1695 """
1696 Obtains the data of the specified service in the k8cluster.
1697
1698 :param cluster_id: id of a K8s cluster known by OSM
1699 :param service_name: name of the K8s service in the specified namespace
1700 :param namespace: K8s namespace used by the KDU instance
1701 :return: If successful, it will return a service with the following data:
1702 - `name` of the service
1703 - `type` type of service in the k8 cluster
1704 - `ports` List of ports offered by the service, for each port includes at least
1705 name, port, protocol
1706 - `cluster_ip` Internal ip to be used inside k8s cluster
1707 - `external_ip` List of external ips (in case they are available)
1708 """
1709
1710 # init config, env
1711 paths, env = self._init_paths_env(
1712 cluster_name=cluster_id, create_if_not_exist=True
1713 )
1714
1715 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1716 self.kubectl_command, paths["kube_config"], namespace, service_name
1717 )
1718
1719 output, _rc = await self._local_async_exec(
1720 command=command, raise_exception_on_error=True, env=env
1721 )
1722
1723 data = yaml.load(output, Loader=yaml.SafeLoader)
1724
1725 service = {
1726 "name": service_name,
1727 "type": self._get_deep(data, ("spec", "type")),
1728 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001729 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001730 }
1731 if service["type"] == "LoadBalancer":
1732 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1733 ip_list = [elem["ip"] for elem in ip_map_list]
1734 service["external_ip"] = ip_list
1735
1736 return service
1737
aktas867418c2021-10-19 18:26:13 +03001738 async def _exec_get_command(
1739 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1740 ):
1741 """Obtains information about the kdu instance."""
1742
1743 full_command = self._get_get_command(
1744 get_command, kdu_instance, namespace, kubeconfig
1745 )
1746
1747 output, _rc = await self._local_async_exec(command=full_command)
1748
1749 return output
1750
1751 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001752 self, inspect_command: str, kdu_model: str, repo_url: str = None
1753 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001754 """Obtains information about an Helm Chart package (´helm show´ command)
1755
1756 Args:
1757 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1758 kdu_model: The name or path of an Helm Chart
1759 repo_url: Helm Chart repository url
1760
1761 Returns:
1762 str: the requested info about the Helm Chart package
1763 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001764
1765 repo_str = ""
1766 if repo_url:
1767 repo_str = " --repo {}".format(repo_url)
1768
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001769 # Obtain the Chart's name and store it in the var kdu_model
1770 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001771
aktas867418c2021-10-19 18:26:13 +03001772 kdu_model, version = self._split_version(kdu_model)
1773 if version:
1774 version_str = "--version {}".format(version)
1775 else:
1776 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001777
garciadeblas82b591c2021-03-24 09:22:13 +01001778 full_command = self._get_inspect_command(
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001779 show_command=inspect_command,
1780 kdu_model=kdu_model,
1781 repo_str=repo_str,
1782 version=version_str,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001783 )
1784
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001785 output, _ = await self._local_async_exec(command=full_command)
aktas867418c2021-10-19 18:26:13 +03001786
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001787 return output
1788
aktas867418c2021-10-19 18:26:13 +03001789 async def _get_replica_count_url(
1790 self,
1791 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001792 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001793 resource_name: str = None,
Pedro Escaleira06313992022-06-04 22:21:57 +01001794 ) -> (int, str):
aktas867418c2021-10-19 18:26:13 +03001795 """Get the replica count value in the Helm Chart Values.
1796
1797 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001798 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001799 repo_url: Helm Chart repository url
1800 resource_name: Resource name
1801
1802 Returns:
Pedro Escaleira06313992022-06-04 22:21:57 +01001803 A tuple with:
1804 - The number of replicas of the specific instance; if not found, returns None; and
1805 - The string corresponding to the replica count key in the Helm values
aktas867418c2021-10-19 18:26:13 +03001806 """
1807
1808 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001809 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1810 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001811 )
1812
Pedro Escaleira06313992022-06-04 22:21:57 +01001813 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1814
aktas867418c2021-10-19 18:26:13 +03001815 if not kdu_values:
1816 raise K8sException(
1817 "kdu_values not found for kdu_model {}".format(kdu_model)
1818 )
1819
1820 if resource_name:
1821 kdu_values = kdu_values.get(resource_name, None)
1822
1823 if not kdu_values:
1824 msg = "resource {} not found in the values in model {}".format(
1825 resource_name, kdu_model
1826 )
1827 self.log.error(msg)
1828 raise K8sException(msg)
1829
1830 duplicate_check = False
1831
1832 replica_str = ""
1833 replicas = None
1834
Pedro Escaleira06313992022-06-04 22:21:57 +01001835 if kdu_values.get("replicaCount") is not None:
aktas867418c2021-10-19 18:26:13 +03001836 replicas = kdu_values["replicaCount"]
1837 replica_str = "replicaCount"
Pedro Escaleira06313992022-06-04 22:21:57 +01001838 elif kdu_values.get("replicas") is not None:
aktas867418c2021-10-19 18:26:13 +03001839 duplicate_check = True
1840 replicas = kdu_values["replicas"]
1841 replica_str = "replicas"
1842 else:
1843 if resource_name:
1844 msg = (
1845 "replicaCount or replicas not found in the resource"
1846 "{} values in model {}. Cannot be scaled".format(
1847 resource_name, kdu_model
1848 )
1849 )
1850 else:
1851 msg = (
1852 "replicaCount or replicas not found in the values"
1853 "in model {}. Cannot be scaled".format(kdu_model)
1854 )
1855 self.log.error(msg)
1856 raise K8sException(msg)
1857
1858 # Control if replicas and replicaCount exists at the same time
1859 msg = "replicaCount and replicas are exists at the same time"
1860 if duplicate_check:
1861 if "replicaCount" in kdu_values:
1862 self.log.error(msg)
1863 raise K8sException(msg)
1864 else:
1865 if "replicas" in kdu_values:
1866 self.log.error(msg)
1867 raise K8sException(msg)
1868
1869 return replicas, replica_str
1870
1871 async def _get_replica_count_instance(
1872 self,
1873 kdu_instance: str,
1874 namespace: str,
1875 kubeconfig: str,
1876 resource_name: str = None,
Pedro Escaleira06313992022-06-04 22:21:57 +01001877 ) -> int:
aktas867418c2021-10-19 18:26:13 +03001878 """Get the replica count value in the instance.
1879
1880 Args:
1881 kdu_instance: The name of the KDU instance
1882 namespace: KDU instance namespace
1883 kubeconfig:
1884 resource_name: Resource name
1885
1886 Returns:
Pedro Escaleira06313992022-06-04 22:21:57 +01001887 The number of replicas of the specific instance; if not found, returns None
aktas867418c2021-10-19 18:26:13 +03001888 """
1889
1890 kdu_values = yaml.load(
1891 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1892 Loader=yaml.SafeLoader,
1893 )
1894
Pedro Escaleira06313992022-06-04 22:21:57 +01001895 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1896
aktas867418c2021-10-19 18:26:13 +03001897 replicas = None
1898
1899 if kdu_values:
1900 resource_values = (
1901 kdu_values.get(resource_name, None) if resource_name else None
1902 )
Pedro Escaleira06313992022-06-04 22:21:57 +01001903
1904 for replica_str in ("replicaCount", "replicas"):
1905 if resource_values:
1906 replicas = resource_values.get(replica_str)
1907 else:
1908 replicas = kdu_values.get(replica_str)
1909
1910 if replicas is not None:
1911 break
aktas867418c2021-10-19 18:26:13 +03001912
1913 return replicas
1914
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001915 async def _store_status(
1916 self,
1917 cluster_id: str,
1918 operation: str,
1919 kdu_instance: str,
1920 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001921 db_dict: dict = None,
Pedro Escaleirab46f88d2022-04-23 19:55:45 +01001922 ) -> None:
1923 """
1924 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1925
1926 :param cluster_id (str): the cluster where the KDU instance is deployed
1927 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1928 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1929 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1930 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1931 values for the keys:
1932 - "collection": The Mongo DB collection to write to
1933 - "filter": The query filter to use in the update process
1934 - "path": The dot separated keys which targets the object to be updated
1935 Defaults to None.
1936 """
1937
1938 try:
1939 detailed_status = await self._status_kdu(
1940 cluster_id=cluster_id,
1941 kdu_instance=kdu_instance,
1942 yaml_format=False,
1943 namespace=namespace,
1944 )
1945
1946 status = detailed_status.get("info").get("description")
1947 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1948
1949 # write status to db
1950 result = await self.write_app_status_to_db(
1951 db_dict=db_dict,
1952 status=str(status),
1953 detailed_status=str(detailed_status),
1954 operation=operation,
1955 )
1956
1957 if not result:
1958 self.log.info("Error writing in database. Task exiting...")
1959
1960 except asyncio.CancelledError as e:
1961 self.log.warning(
1962 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1963 )
1964 except Exception as e:
1965 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001966
1967 # params for use in -f file
1968 # returns values file option and filename (in order to delete it at the end)
1969 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1970
1971 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001972 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001973
1974 def get_random_number():
1975 r = random.randrange(start=1, stop=99999999)
1976 s = str(r)
1977 while len(s) < 10:
1978 s = "0" + s
1979 return s
1980
1981 params2 = dict()
1982 for key in params:
1983 value = params.get(key)
1984 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001985 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001986 params2[key] = value
1987
1988 values_file = get_random_number() + ".yaml"
1989 with open(values_file, "w") as stream:
1990 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1991
1992 return "-f {}".format(values_file), values_file
1993
1994 return "", None
1995
1996 # params for use in --set option
1997 @staticmethod
1998 def _params_to_set_option(params: dict) -> str:
1999 params_str = ""
2000 if params and len(params) > 0:
2001 start = True
2002 for key in params:
2003 value = params.get(key, None)
2004 if value is not None:
2005 if start:
2006 params_str += "--set "
2007 start = False
2008 else:
2009 params_str += ","
2010 params_str += "{}={}".format(key, value)
2011 return params_str
2012
2013 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01002014 def generate_kdu_instance_name(**kwargs):
2015 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002016 # check embeded chart (file or dir)
2017 if chart_name.startswith("/"):
2018 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02002019 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002020 # check URL
2021 elif "://" in chart_name:
2022 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02002023 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002024
2025 name = ""
2026 for c in chart_name:
2027 if c.isalpha() or c.isnumeric():
2028 name += c
2029 else:
2030 name += "-"
2031 if len(name) > 35:
2032 name = name[0:35]
2033
2034 # if does not start with alpha character, prefix 'a'
2035 if not name[0].isalpha():
2036 name = "a" + name
2037
2038 name += "-"
2039
2040 def get_random_number():
2041 r = random.randrange(start=1, stop=99999999)
2042 s = str(r)
2043 s = s.rjust(10, "0")
2044 return s
2045
2046 name = name + get_random_number()
2047 return name.lower()
aktas867418c2021-10-19 18:26:13 +03002048
2049 def _split_version(self, kdu_model: str) -> (str, str):
2050 version = None
garciadeblas04393192022-06-08 15:39:24 +02002051 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03002052 parts = kdu_model.split(sep=":")
2053 if len(parts) == 2:
2054 version = str(parts[1])
2055 kdu_model = parts[0]
2056 return kdu_model, version
2057
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002058 def _split_repo(self, kdu_model: str) -> (str, str):
2059 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2060
2061 Args:
2062 kdu_model (str): Associated KDU model
2063
2064 Returns:
2065 (str, str): Tuple with the Chart name in index 0, and the repo name
2066 in index 2; if there was a problem finding them, return None
2067 for both
2068 """
2069
2070 chart_name = None
garciadeblas7faf4ec2022-04-08 22:53:25 +02002071 repo_name = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002072
garciadeblas7faf4ec2022-04-08 22:53:25 +02002073 idx = kdu_model.find("/")
2074 if idx >= 0:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002075 chart_name = kdu_model[idx + 1 :]
garciadeblas7faf4ec2022-04-08 22:53:25 +02002076 repo_name = kdu_model[:idx]
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002077
2078 return chart_name, repo_name
garciadeblas7faf4ec2022-04-08 22:53:25 +02002079
aktas867418c2021-10-19 18:26:13 +03002080 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01002081 """Obtain the Helm repository for an Helm Chart
2082
2083 Args:
2084 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2085 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2086
2087 Returns:
2088 str: the repository URL; if Helm Chart is a local one, the function returns None
2089 """
2090
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002091 _, repo_name = self._split_repo(kdu_model=kdu_model)
2092
aktas867418c2021-10-19 18:26:13 +03002093 repo_url = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002094 if repo_name:
aktas867418c2021-10-19 18:26:13 +03002095 # Find repository link
2096 local_repo_list = await self.repo_list(cluster_uuid)
2097 for repo in local_repo_list:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002098 if repo["name"] == repo_name:
2099 repo_url = repo["url"]
2100 break # it is not necessary to continue the loop if the repo link was found...
2101
aktas867418c2021-10-19 18:26:13 +03002102 return repo_url
Gabriel Cubafb03e902022-10-07 11:40:03 -05002103
2104 async def create_certificate(
2105 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2106 ):
2107 paths, env = self._init_paths_env(
2108 cluster_name=cluster_uuid, create_if_not_exist=True
2109 )
2110 kubectl = Kubectl(config_file=paths["kube_config"])
2111 await kubectl.create_certificate(
2112 namespace=namespace,
2113 name=name,
2114 dns_prefix=dns_prefix,
2115 secret_name=secret_name,
2116 usages=[usage],
2117 issuer_name="ca-issuer",
2118 )
2119
2120 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2121 paths, env = self._init_paths_env(
2122 cluster_name=cluster_uuid, create_if_not_exist=True
2123 )
2124 kubectl = Kubectl(config_file=paths["kube_config"])
2125 await kubectl.delete_certificate(namespace, certificate_name)