blob: e89d6fa2d68a1aac9e2d85877275372ad79684c3 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleira1f222a92022-06-20 15:40:43 +010093 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
Pedro Escaleirab41de172022-04-02 00:44:08 +010096 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000097 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010098 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000108
109 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200124 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100131 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000132 else:
133 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
garciadeblas82b591c2021-03-24 09:22:13 +0100135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
Pedro Escaleirab41de172022-04-02 00:44:08 +0100155 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000156
157 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000166 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100169 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100170 )
171 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000172
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 # init_env
174 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100175 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000176 )
177
bravof7bd5c6a2021-11-17 11:14:57 -0300178 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100179 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300180
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000181 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300183 paths["kube_config"], self._helm_command, name, url
184 )
bravof0ab522f2021-11-23 19:33:18 -0300185
186 if cert:
187 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000201 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000205
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000215 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100216 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000217
garciadeblas7faf4ec2022-04-08 22:53:25 +0200218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
Pedro Escaleirab41de172022-04-02 00:44:08 +0100250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000251
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 # config filename
253 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100254 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000255 )
256
bravof7bd5c6a2021-11-17 11:14:57 -0300257 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100258 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100270 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000286
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 # init env, paths
288 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100289 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000290 )
291
bravof7bd5c6a2021-11-17 11:14:57 -0300292 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100293 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
garciadeblas82b591c2021-03-24 09:22:13 +0100298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000300 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100303 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000304
305 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000311 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200312 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000313
David Garciaeb8943a2021-04-12 12:07:37 +0200314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100325 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100326 )
327 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100330 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100359 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000360 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367
368 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000371 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100372 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
garciadeblas04393192022-06-08 15:39:24 +0200377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
lloretgalleg095392b2020-11-20 11:28:08 +0000380 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000393 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
aktas867418c2021-10-19 18:26:13 +0300405 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000406
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100407 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200408 if repo:
limon3c443f52022-07-21 13:55:55 +0200409 await self.repo_update(cluster_id, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200410
garciadeblas82b591c2021-03-24 09:22:13 +0100411 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100420 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100497 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000498 )
499
bravof7bd5c6a2021-11-17 11:14:57 -0300500 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100501 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300502
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100505 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506 )
507
508 # version
aktas867418c2021-10-19 18:26:13 +0300509 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100511 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200512 if repo:
limon3c443f52022-07-21 13:55:55 +0200513 await self.repo_update(cluster_uuid, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200514
garciadeblas82b591c2021-03-24 09:22:13 +0100515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300523 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100524 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100539 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000544 )
545 )
546
547 # wait for execution task
548 await asyncio.wait([exec_task])
549
550 # cancel status task
551 status_task.cancel()
552 output, rc = exec_task.result()
553
554 else:
555
556 output, rc = await self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559
560 # remove temporal values yaml file
561 if file_to_delete:
562 os.remove(file_to_delete)
563
564 # write final status
565 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100566 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000567 kdu_instance=kdu_instance,
568 namespace=instance_info["namespace"],
569 db_dict=db_dict,
570 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100579 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
aktas2962f3e2021-03-15 11:05:35 +0300592 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100602 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300603 ):
aktas867418c2021-10-19 18:26:13 +0300604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
Pedro Escaleirab41de172022-04-02 00:44:08 +0100622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100638 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300645
646 _, replica_str = await self._get_replica_count_url(
647 kdu_model, repo_url, resource_name
648 )
649
650 command = self._get_upgrade_scale_command(
651 kdu_model,
652 kdu_instance,
653 instance_info["namespace"],
654 scale,
655 version,
656 atomic,
657 replica_str,
658 total_timeout,
659 resource_name,
660 paths["kube_config"],
661 )
662
663 self.log.debug("scaling: {}".format(command))
664
665 if atomic:
666 # exec helm in a task
667 exec_task = asyncio.ensure_future(
668 coro_or_future=self._local_async_exec(
669 command=command, raise_exception_on_error=False, env=env
670 )
671 )
672 # write status in another task
673 status_task = asyncio.ensure_future(
674 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100675 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300676 kdu_instance=kdu_instance,
677 namespace=instance_info["namespace"],
678 db_dict=db_dict,
679 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300680 )
681 )
682
683 # wait for execution task
684 await asyncio.wait([exec_task])
685
686 # cancel status task
687 status_task.cancel()
688 output, rc = exec_task.result()
689
690 else:
691 output, rc = await self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694
695 # write final status
696 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100697 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300698 kdu_instance=kdu_instance,
699 namespace=instance_info["namespace"],
700 db_dict=db_dict,
701 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300702 )
703
704 if rc != 0:
705 msg = "Error executing command: {}\nOutput: {}".format(command, output)
706 self.log.error(msg)
707 raise K8sException(msg)
708
709 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100710 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300711
712 return True
aktas2962f3e2021-03-15 11:05:35 +0300713
714 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100715 self,
716 resource_name: str,
717 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300718 cluster_uuid: str,
719 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100720 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300721 ) -> int:
722 """Get a resource scale count.
723
724 Args:
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100728 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300729 kwargs: Additional parameters
730
731 Returns:
732 Resource instance count
733 """
734
aktas867418c2021-10-19 18:26:13 +0300735 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100736 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300737 )
738
739 # look for instance to obtain namespace
740 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
741 if not instance_info:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance))
743
744 # init env, paths
Pedro Escaleira06313992022-06-04 22:21:57 +0100745 paths, _ = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100746 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300747 )
748
749 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100750 kdu_instance=kdu_instance,
751 namespace=instance_info["namespace"],
752 kubeconfig=paths["kube_config"],
753 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300754 )
755
Pedro Escaleira06313992022-06-04 22:21:57 +0100756 self.log.debug(
757 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
758 )
759
aktas867418c2021-10-19 18:26:13 +0300760 # Get default value if scale count is not found from provided values
Pedro Escaleira06313992022-06-04 22:21:57 +0100761 # Important note: this piece of code shall only be executed in the first scaling operation,
762 # since it is expected that the _get_replica_count_instance is able to obtain the number of
763 # replicas when a scale operation was already conducted previously for this KDU/resource!
764 if replicas is None:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100765 repo_url = await self._find_repo(
766 kdu_model=kdu_model, cluster_uuid=cluster_uuid
767 )
aktas867418c2021-10-19 18:26:13 +0300768 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100769 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300770 )
771
Pedro Escaleira06313992022-06-04 22:21:57 +0100772 self.log.debug(
773 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
774 f"{resource_name} obtained: {replicas}"
775 )
776
777 if replicas is None:
778 msg = "Replica count not found. Cannot be scaled"
779 self.log.error(msg)
780 raise K8sException(msg)
aktas867418c2021-10-19 18:26:13 +0300781
782 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300783
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000784 async def rollback(
785 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
786 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000787 self.log.debug(
788 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100789 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000790 )
791 )
792
793 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100794 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000795
796 # look for instance to obtain namespace
797 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
798 if not instance_info:
799 raise K8sException("kdu_instance {} not found".format(kdu_instance))
800
801 # init env, paths
802 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100803 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000804 )
805
bravof7bd5c6a2021-11-17 11:14:57 -0300806 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100807 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300808
garciadeblas82b591c2021-03-24 09:22:13 +0100809 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300810 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100811 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000812
813 self.log.debug("rolling_back: {}".format(command))
814
815 # exec helm in a task
816 exec_task = asyncio.ensure_future(
817 coro_or_future=self._local_async_exec(
818 command=command, raise_exception_on_error=False, env=env
819 )
820 )
821 # write status in another task
822 status_task = asyncio.ensure_future(
823 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100824 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000825 kdu_instance=kdu_instance,
826 namespace=instance_info["namespace"],
827 db_dict=db_dict,
828 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000829 )
830 )
831
832 # wait for execution task
833 await asyncio.wait([exec_task])
834
835 # cancel status task
836 status_task.cancel()
837
838 output, rc = exec_task.result()
839
840 # write final status
841 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100842 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000843 kdu_instance=kdu_instance,
844 namespace=instance_info["namespace"],
845 db_dict=db_dict,
846 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000847 )
848
849 if rc != 0:
850 msg = "Error executing command: {}\nOutput: {}".format(command, output)
851 self.log.error(msg)
852 raise K8sException(msg)
853
854 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100855 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000856
857 # return new revision number
858 instance = await self.get_instance_info(
859 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
860 )
861 if instance:
862 revision = int(instance.get("revision"))
863 self.log.debug("New revision: {}".format(revision))
864 return revision
865 else:
866 return 0
867
David Garciaeb8943a2021-04-12 12:07:37 +0200868 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000869 """
870 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
871 (this call should happen after all _terminate-config-primitive_ of the VNF
872 are invoked).
873
874 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
875 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200876 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000877 :return: True if successful
878 """
879
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000880 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100881 "uninstall kdu_instance {} from cluster {}".format(
882 kdu_instance, cluster_uuid
883 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000884 )
885
886 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100887 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000888
889 # look for instance to obtain namespace
890 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
891 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200892 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
893 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000894 # init env, paths
895 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100896 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000897 )
898
bravof7bd5c6a2021-11-17 11:14:57 -0300899 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100900 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300901
902 command = self._get_uninstall_command(
903 kdu_instance, instance_info["namespace"], paths["kube_config"]
904 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000905 output, _rc = await self._local_async_exec(
906 command=command, raise_exception_on_error=True, env=env
907 )
908
909 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100910 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000911
912 return self._output_to_table(output)
913
914 async def instances_list(self, cluster_uuid: str) -> list:
915 """
916 returns a list of deployed releases in a cluster
917
918 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
919 :return:
920 """
921
Pedro Escaleirab41de172022-04-02 00:44:08 +0100922 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000923
924 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100925 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000926
927 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100928 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000929
930 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100931 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000932
933 return result
934
935 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
936 instances = await self.instances_list(cluster_uuid=cluster_uuid)
937 for instance in instances:
938 if instance.get("name") == kdu_instance:
939 return instance
940 self.log.debug("Instance {} not found".format(kdu_instance))
941 return None
942
aticig8070c3c2022-04-18 00:31:42 +0300943 async def upgrade_charm(
944 self,
945 ee_id: str = None,
946 path: str = None,
947 charm_id: str = None,
948 charm_type: str = None,
949 timeout: float = None,
950 ) -> str:
951 """This method upgrade charms in VNFs
952
953 Args:
954 ee_id: Execution environment id
955 path: Local path to the charm
956 charm_id: charm-id
957 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
958 timeout: (Float) Timeout for the ns update operation
959
960 Returns:
961 The output of the update operation if status equals to "completed"
962 """
963 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
964
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000965 async def exec_primitive(
966 self,
967 cluster_uuid: str = None,
968 kdu_instance: str = None,
969 primitive_name: str = None,
970 timeout: float = 300,
971 params: dict = None,
972 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200973 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000974 ) -> str:
975 """Exec primitive (Juju action)
976
977 :param cluster_uuid: The UUID of the cluster or namespace:cluster
978 :param kdu_instance: The unique name of the KDU instance
979 :param primitive_name: Name of action that will be executed
980 :param timeout: Timeout for action execution
981 :param params: Dictionary of all the parameters needed for the action
982 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200983 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000984
985 :return: Returns the output of the action
986 """
987 raise K8sException(
988 "KDUs deployed with Helm don't support actions "
989 "different from rollback, upgrade and status"
990 )
991
garciadeblas82b591c2021-03-24 09:22:13 +0100992 async def get_services(
993 self, cluster_uuid: str, kdu_instance: str, namespace: str
994 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000995 """
996 Returns a list of services defined for the specified kdu instance.
997
998 :param cluster_uuid: UUID of a K8s cluster known by OSM
999 :param kdu_instance: unique name for the KDU instance
1000 :param namespace: K8s namespace used by the KDU instance
1001 :return: If successful, it will return a list of services, Each service
1002 can have the following data:
1003 - `name` of the service
1004 - `type` type of service in the k8 cluster
1005 - `ports` List of ports offered by the service, for each port includes at least
1006 name, port, protocol
1007 - `cluster_ip` Internal ip to be used inside k8s cluster
1008 - `external_ip` List of external ips (in case they are available)
1009 """
1010
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001011 self.log.debug(
1012 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1013 cluster_uuid, kdu_instance
1014 )
1015 )
1016
bravof7bd5c6a2021-11-17 11:14:57 -03001017 # init env, paths
1018 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001019 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001020 )
1021
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001022 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001023 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001024
1025 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001026 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001027 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001028 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001029
1030 service_list = []
1031 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001032 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001033 service_list.append(service)
1034
1035 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001036 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001037
1038 return service_list
1039
garciadeblas82b591c2021-03-24 09:22:13 +01001040 async def get_service(
1041 self, cluster_uuid: str, service_name: str, namespace: str
1042 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001043
1044 self.log.debug(
1045 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001046 service_name, namespace, cluster_uuid
1047 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001048 )
1049
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001050 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001051 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001052
Pedro Escaleirab41de172022-04-02 00:44:08 +01001053 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001054
1055 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001056 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001057
1058 return service
1059
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001060 async def status_kdu(
1061 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1062 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001063 """
1064 This call would retrieve tha current state of a given KDU instance. It would be
1065 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1066 values_ of the configuration parameters applied to a given instance. This call
1067 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001068
David Garciaeb8943a2021-04-12 12:07:37 +02001069 :param cluster_uuid: UUID of a K8s cluster known by OSM
1070 :param kdu_instance: unique name for the KDU instance
1071 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001072 :param yaml_format: if the return shall be returned as an YAML string or as a
1073 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001074 :return: If successful, it will return the following vector of arguments:
1075 - K8s `namespace` in the cluster where the KDU lives
1076 - `state` of the KDU instance. It can be:
1077 - UNKNOWN
1078 - DEPLOYED
1079 - DELETED
1080 - SUPERSEDED
1081 - FAILED or
1082 - DELETING
1083 - List of `resources` (objects) that this release consists of, sorted by kind,
1084 and the status of those resources
1085 - Last `deployment_time`.
1086
1087 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001088 self.log.debug(
1089 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1090 cluster_uuid, kdu_instance
1091 )
1092 )
1093
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001094 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001095 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001096
1097 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001098 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001099 for instance in instances:
1100 if instance.get("name") == kdu_instance:
1101 break
1102 else:
1103 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001104 raise K8sException(
1105 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001106 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001107 )
1108 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001109
1110 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001111 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001112 kdu_instance=kdu_instance,
1113 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001114 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001115 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001116 )
1117
1118 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001119 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001120
1121 return status
1122
aktas867418c2021-10-19 18:26:13 +03001123 async def get_values_kdu(
1124 self, kdu_instance: str, namespace: str, kubeconfig: str
1125 ) -> str:
1126
1127 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1128
1129 return await self._exec_get_command(
1130 get_command="values",
1131 kdu_instance=kdu_instance,
1132 namespace=namespace,
1133 kubeconfig=kubeconfig,
1134 )
1135
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001136 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001137 """Method to obtain the Helm Chart package's values
1138
1139 Args:
1140 kdu_model: The name or path of an Helm Chart
1141 repo_url: Helm Chart repository url
1142
1143 Returns:
1144 str: the values of the Helm Chart package
1145 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001146
1147 self.log.debug(
1148 "inspect kdu_model values {} from (optional) repo: {}".format(
1149 kdu_model, repo_url
1150 )
1151 )
1152
aktas867418c2021-10-19 18:26:13 +03001153 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001154 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1155 )
1156
1157 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1158
1159 self.log.debug(
1160 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1161 )
1162
aktas867418c2021-10-19 18:26:13 +03001163 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001164 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1165 )
1166
1167 async def synchronize_repos(self, cluster_uuid: str):
1168
1169 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1170 try:
1171 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1172 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1173
1174 local_repo_list = await self.repo_list(cluster_uuid)
1175 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1176
1177 deleted_repo_list = []
1178 added_repo_dict = {}
1179
1180 # iterate over the list of repos in the database that should be
1181 # added if not present
1182 for repo_name, db_repo in db_repo_dict.items():
1183 try:
1184 # check if it is already present
1185 curr_repo_url = local_repo_dict.get(db_repo["name"])
1186 repo_id = db_repo.get("_id")
1187 if curr_repo_url != db_repo["url"]:
1188 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001189 self.log.debug(
1190 "repo {} url changed, delete and and again".format(
1191 db_repo["url"]
1192 )
1193 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001194 await self.repo_remove(cluster_uuid, db_repo["name"])
1195 deleted_repo_list.append(repo_id)
1196
1197 # add repo
1198 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001199 if "ca_cert" in db_repo:
1200 await self.repo_add(
1201 cluster_uuid,
1202 db_repo["name"],
1203 db_repo["url"],
1204 cert=db_repo["ca_cert"],
1205 )
1206 else:
1207 await self.repo_add(
1208 cluster_uuid,
1209 db_repo["name"],
1210 db_repo["url"],
1211 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001212 added_repo_dict[repo_id] = db_repo["name"]
1213 except Exception as e:
1214 raise K8sException(
1215 "Error adding repo id: {}, err_msg: {} ".format(
1216 repo_id, repr(e)
1217 )
1218 )
1219
1220 # Delete repos that are present but not in nbi_list
1221 for repo_name in local_repo_dict:
1222 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1223 self.log.debug("delete repo {}".format(repo_name))
1224 try:
1225 await self.repo_remove(cluster_uuid, repo_name)
1226 deleted_repo_list.append(repo_name)
1227 except Exception as e:
1228 self.warning(
1229 "Error deleting repo, name: {}, err_msg: {}".format(
1230 repo_name, str(e)
1231 )
1232 )
1233
1234 return deleted_repo_list, added_repo_dict
1235
1236 except K8sException:
1237 raise
1238 except Exception as e:
1239 # Do not raise errors synchronizing repos
1240 self.log.error("Error synchronizing repos: {}".format(e))
1241 raise Exception("Error synchronizing repos: {}".format(e))
1242
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001243 def _get_db_repos_dict(self, repo_ids: list):
1244 db_repos_dict = {}
1245 for repo_id in repo_ids:
1246 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1247 db_repos_dict[db_repo["name"]] = db_repo
1248 return db_repos_dict
1249
1250 """
1251 ####################################################################################
1252 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1253 ####################################################################################
1254 """
1255
1256 @abc.abstractmethod
1257 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1258 """
1259 Creates and returns base cluster and kube dirs and returns them.
1260 Also created helm3 dirs according to new directory specification, paths are
1261 not returned but assigned to helm environment variables
1262
1263 :param cluster_name: cluster_name
1264 :return: Dictionary with config_paths and dictionary with helm environment variables
1265 """
1266
1267 @abc.abstractmethod
1268 async def _cluster_init(self, cluster_id, namespace, paths, env):
1269 """
1270 Implements the helm version dependent cluster initialization
1271 """
1272
1273 @abc.abstractmethod
1274 async def _instances_list(self, cluster_id):
1275 """
1276 Implements the helm version dependent helm instances list
1277 """
1278
1279 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001280 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001281 """
1282 Implements the helm version dependent method to obtain services from a helm instance
1283 """
1284
1285 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001286 async def _status_kdu(
1287 self,
1288 cluster_id: str,
1289 kdu_instance: str,
1290 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001291 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001292 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001293 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001294 """
1295 Implements the helm version dependent method to obtain status of a helm instance
1296 """
1297
1298 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001299 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001300 self,
1301 kdu_model,
1302 kdu_instance,
1303 namespace,
1304 params_str,
1305 version,
1306 atomic,
1307 timeout,
1308 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001309 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001310 """
1311 Obtain command to be executed to delete the indicated instance
1312 """
1313
1314 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001315 def _get_upgrade_scale_command(
1316 self,
1317 kdu_model,
1318 kdu_instance,
1319 namespace,
1320 count,
1321 version,
1322 atomic,
1323 replicas,
1324 timeout,
1325 resource_name,
1326 kubeconfig,
1327 ) -> str:
Pedro Escaleira0a2060c2022-07-07 22:18:35 +01001328 """Generates the command to scale a Helm Chart release
1329
1330 Args:
1331 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1332 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1333 namespace (str): Namespace where this KDU instance is deployed
1334 scale (int): Scale count
1335 version (str): Constraint with specific version of the Chart to use
1336 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1337 The --wait flag will be set automatically if --atomic is used
1338 replica_str (str): The key under resource_name key where the scale count is stored
1339 timeout (float): The time, in seconds, to wait
1340 resource_name (str): The KDU's resource to scale
1341 kubeconfig (str): Kubeconfig file path
1342
1343 Returns:
1344 str: command to scale a Helm Chart release
1345 """
aktas867418c2021-10-19 18:26:13 +03001346
1347 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001348 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001349 self,
1350 kdu_model,
1351 kdu_instance,
1352 namespace,
1353 params_str,
1354 version,
1355 atomic,
1356 timeout,
1357 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001358 ) -> str:
Pedro Escaleira0a2060c2022-07-07 22:18:35 +01001359 """Generates the command to upgrade a Helm Chart release
1360
1361 Args:
1362 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1363 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1364 namespace (str): Namespace where this KDU instance is deployed
1365 params_str (str): Params used to upgrade the Helm Chart release
1366 version (str): Constraint with specific version of the Chart to use
1367 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1368 The --wait flag will be set automatically if --atomic is used
1369 timeout (float): The time, in seconds, to wait
1370 kubeconfig (str): Kubeconfig file path
1371
1372 Returns:
1373 str: command to upgrade a Helm Chart release
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001374 """
1375
1376 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001377 def _get_rollback_command(
1378 self, kdu_instance, namespace, revision, kubeconfig
1379 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001380 """
1381 Obtain command to be executed to rollback the indicated instance
1382 """
1383
1384 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001385 def _get_uninstall_command(
1386 self, kdu_instance: str, namespace: str, kubeconfig: str
1387 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001388 """
1389 Obtain command to be executed to delete the indicated instance
1390 """
1391
1392 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001393 def _get_inspect_command(
1394 self, show_command: str, kdu_model: str, repo_str: str, version: str
1395 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001396 """Generates the command to obtain the information about an Helm Chart package
1397 (´helm show ...´ command)
1398
1399 Args:
1400 show_command: the second part of the command (`helm show <show_command>`)
1401 kdu_model: The name or path of an Helm Chart
1402 repo_url: Helm Chart repository url
1403 version: constraint with specific version of the Chart to use
1404
1405 Returns:
1406 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001407 """
1408
1409 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001410 def _get_get_command(
1411 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1412 ):
1413 """Obtain command to be executed to get information about the kdu instance."""
1414
1415 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001416 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1417 """
1418 Method call to uninstall cluster software for helm. This method is dependent
1419 of helm version
1420 For Helm v2 it will be called when Tiller must be uninstalled
1421 For Helm v3 it does nothing and does not need to be callled
1422 """
1423
lloretgalleg095392b2020-11-20 11:28:08 +00001424 @abc.abstractmethod
1425 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1426 """
1427 Obtains the cluster repos identifiers
1428 """
1429
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001430 """
1431 ####################################################################################
1432 ################################### P R I V A T E ##################################
1433 ####################################################################################
1434 """
1435
1436 @staticmethod
1437 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1438 if os.path.exists(filename):
1439 return True
1440 else:
1441 msg = "File {} does not exist".format(filename)
1442 if exception_if_not_exists:
1443 raise K8sException(msg)
1444
1445 @staticmethod
1446 def _remove_multiple_spaces(strobj):
1447 strobj = strobj.strip()
1448 while " " in strobj:
1449 strobj = strobj.replace(" ", " ")
1450 return strobj
1451
1452 @staticmethod
1453 def _output_to_lines(output: str) -> list:
1454 output_lines = list()
1455 lines = output.splitlines(keepends=False)
1456 for line in lines:
1457 line = line.strip()
1458 if len(line) > 0:
1459 output_lines.append(line)
1460 return output_lines
1461
1462 @staticmethod
1463 def _output_to_table(output: str) -> list:
1464 output_table = list()
1465 lines = output.splitlines(keepends=False)
1466 for line in lines:
1467 line = line.replace("\t", " ")
1468 line_list = list()
1469 output_table.append(line_list)
1470 cells = line.split(sep=" ")
1471 for cell in cells:
1472 cell = cell.strip()
1473 if len(cell) > 0:
1474 line_list.append(cell)
1475 return output_table
1476
1477 @staticmethod
1478 def _parse_services(output: str) -> list:
1479 lines = output.splitlines(keepends=False)
1480 services = []
1481 for line in lines:
1482 line = line.replace("\t", " ")
1483 cells = line.split(sep=" ")
1484 if len(cells) > 0 and cells[0].startswith("service/"):
1485 elems = cells[0].split(sep="/")
1486 if len(elems) > 1:
1487 services.append(elems[1])
1488 return services
1489
1490 @staticmethod
1491 def _get_deep(dictionary: dict, members: tuple):
1492 target = dictionary
1493 value = None
1494 try:
1495 for m in members:
1496 value = target.get(m)
1497 if not value:
1498 return None
1499 else:
1500 target = value
1501 except Exception:
1502 pass
1503 return value
1504
1505 # find key:value in several lines
1506 @staticmethod
1507 def _find_in_lines(p_lines: list, p_key: str) -> str:
1508 for line in p_lines:
1509 try:
1510 if line.startswith(p_key + ":"):
1511 parts = line.split(":")
1512 the_value = parts[1].strip()
1513 return the_value
1514 except Exception:
1515 # ignore it
1516 pass
1517 return None
1518
1519 @staticmethod
1520 def _lower_keys_list(input_list: list):
1521 """
1522 Transform the keys in a list of dictionaries to lower case and returns a new list
1523 of dictionaries
1524 """
1525 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001526 if input_list:
1527 for dictionary in input_list:
1528 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1529 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001530 return new_list
1531
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001532 async def _local_async_exec(
1533 self,
1534 command: str,
1535 raise_exception_on_error: bool = False,
1536 show_error_log: bool = True,
1537 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001538 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001539 ) -> (str, int):
1540
1541 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001542 self.log.debug(
1543 "Executing async local command: {}, env: {}".format(command, env)
1544 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001545
1546 # split command
1547 command = shlex.split(command)
1548
1549 environ = os.environ.copy()
1550 if env:
1551 environ.update(env)
1552
1553 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001554 async with self.cmd_lock:
1555 process = await asyncio.create_subprocess_exec(
1556 *command,
1557 stdout=asyncio.subprocess.PIPE,
1558 stderr=asyncio.subprocess.PIPE,
1559 env=environ,
1560 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001561
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001562 # wait for command terminate
1563 stdout, stderr = await process.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001564
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001565 return_code = process.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001566
1567 output = ""
1568 if stdout:
1569 output = stdout.decode("utf-8").strip()
1570 # output = stdout.decode()
1571 if stderr:
1572 output = stderr.decode("utf-8").strip()
1573 # output = stderr.decode()
1574
1575 if return_code != 0 and show_error_log:
1576 self.log.debug(
1577 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1578 )
1579 else:
1580 self.log.debug("Return code: {}".format(return_code))
1581
1582 if raise_exception_on_error and return_code != 0:
1583 raise K8sException(output)
1584
1585 if encode_utf8:
1586 output = output.encode("utf-8").strip()
1587 output = str(output).replace("\\n", "\n")
1588
1589 return output, return_code
1590
1591 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001592 # first, kill the process if it is still running
1593 if process.returncode is None:
1594 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001595 raise
1596 except K8sException:
1597 raise
1598 except Exception as e:
1599 msg = "Exception executing command: {} -> {}".format(command, e)
1600 self.log.error(msg)
1601 if raise_exception_on_error:
1602 raise K8sException(e) from e
1603 else:
1604 return "", -1
1605
garciadeblas82b591c2021-03-24 09:22:13 +01001606 async def _local_async_exec_pipe(
1607 self,
1608 command1: str,
1609 command2: str,
1610 raise_exception_on_error: bool = True,
1611 show_error_log: bool = True,
1612 encode_utf8: bool = False,
1613 env: dict = None,
1614 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001615
1616 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1617 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1618 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001619 self.log.debug(
1620 "Executing async local command: {}, env: {}".format(command, env)
1621 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001622
1623 # split command
1624 command1 = shlex.split(command1)
1625 command2 = shlex.split(command2)
1626
1627 environ = os.environ.copy()
1628 if env:
1629 environ.update(env)
1630
1631 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001632 async with self.cmd_lock:
1633 read, write = os.pipe()
Pedro Escaleirad3817992022-07-23 23:34:42 +01001634 process_1 = await asyncio.create_subprocess_exec(
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001635 *command1, stdout=write, env=environ
1636 )
1637 os.close(write)
1638 process_2 = await asyncio.create_subprocess_exec(
1639 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1640 )
1641 os.close(read)
1642 stdout, stderr = await process_2.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001643
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001644 return_code = process_2.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001645
1646 output = ""
1647 if stdout:
1648 output = stdout.decode("utf-8").strip()
1649 # output = stdout.decode()
1650 if stderr:
1651 output = stderr.decode("utf-8").strip()
1652 # output = stderr.decode()
1653
1654 if return_code != 0 and show_error_log:
1655 self.log.debug(
1656 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1657 )
1658 else:
1659 self.log.debug("Return code: {}".format(return_code))
1660
1661 if raise_exception_on_error and return_code != 0:
1662 raise K8sException(output)
1663
1664 if encode_utf8:
1665 output = output.encode("utf-8").strip()
1666 output = str(output).replace("\\n", "\n")
1667
1668 return output, return_code
1669 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001670 # first, kill the processes if they are still running
1671 for process in (process_1, process_2):
1672 if process.returncode is None:
1673 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001674 raise
1675 except K8sException:
1676 raise
1677 except Exception as e:
1678 msg = "Exception executing command: {} -> {}".format(command, e)
1679 self.log.error(msg)
1680 if raise_exception_on_error:
1681 raise K8sException(e) from e
1682 else:
1683 return "", -1
1684
1685 async def _get_service(self, cluster_id, service_name, namespace):
1686 """
1687 Obtains the data of the specified service in the k8cluster.
1688
1689 :param cluster_id: id of a K8s cluster known by OSM
1690 :param service_name: name of the K8s service in the specified namespace
1691 :param namespace: K8s namespace used by the KDU instance
1692 :return: If successful, it will return a service with the following data:
1693 - `name` of the service
1694 - `type` type of service in the k8 cluster
1695 - `ports` List of ports offered by the service, for each port includes at least
1696 name, port, protocol
1697 - `cluster_ip` Internal ip to be used inside k8s cluster
1698 - `external_ip` List of external ips (in case they are available)
1699 """
1700
1701 # init config, env
1702 paths, env = self._init_paths_env(
1703 cluster_name=cluster_id, create_if_not_exist=True
1704 )
1705
1706 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1707 self.kubectl_command, paths["kube_config"], namespace, service_name
1708 )
1709
1710 output, _rc = await self._local_async_exec(
1711 command=command, raise_exception_on_error=True, env=env
1712 )
1713
1714 data = yaml.load(output, Loader=yaml.SafeLoader)
1715
1716 service = {
1717 "name": service_name,
1718 "type": self._get_deep(data, ("spec", "type")),
1719 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001720 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001721 }
1722 if service["type"] == "LoadBalancer":
1723 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1724 ip_list = [elem["ip"] for elem in ip_map_list]
1725 service["external_ip"] = ip_list
1726
1727 return service
1728
aktas867418c2021-10-19 18:26:13 +03001729 async def _exec_get_command(
1730 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1731 ):
1732 """Obtains information about the kdu instance."""
1733
1734 full_command = self._get_get_command(
1735 get_command, kdu_instance, namespace, kubeconfig
1736 )
1737
1738 output, _rc = await self._local_async_exec(command=full_command)
1739
1740 return output
1741
1742 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001743 self, inspect_command: str, kdu_model: str, repo_url: str = None
1744 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001745 """Obtains information about an Helm Chart package (´helm show´ command)
1746
1747 Args:
1748 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1749 kdu_model: The name or path of an Helm Chart
1750 repo_url: Helm Chart repository url
1751
1752 Returns:
1753 str: the requested info about the Helm Chart package
1754 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001755
1756 repo_str = ""
1757 if repo_url:
1758 repo_str = " --repo {}".format(repo_url)
1759
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001760 # Obtain the Chart's name and store it in the var kdu_model
1761 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001762
aktas867418c2021-10-19 18:26:13 +03001763 kdu_model, version = self._split_version(kdu_model)
1764 if version:
1765 version_str = "--version {}".format(version)
1766 else:
1767 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001768
garciadeblas82b591c2021-03-24 09:22:13 +01001769 full_command = self._get_inspect_command(
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001770 show_command=inspect_command,
1771 kdu_model=kdu_model,
1772 repo_str=repo_str,
1773 version=version_str,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001774 )
1775
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001776 output, _ = await self._local_async_exec(command=full_command)
aktas867418c2021-10-19 18:26:13 +03001777
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001778 return output
1779
aktas867418c2021-10-19 18:26:13 +03001780 async def _get_replica_count_url(
1781 self,
1782 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001783 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001784 resource_name: str = None,
Pedro Escaleira06313992022-06-04 22:21:57 +01001785 ) -> (int, str):
aktas867418c2021-10-19 18:26:13 +03001786 """Get the replica count value in the Helm Chart Values.
1787
1788 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001789 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001790 repo_url: Helm Chart repository url
1791 resource_name: Resource name
1792
1793 Returns:
Pedro Escaleira06313992022-06-04 22:21:57 +01001794 A tuple with:
1795 - The number of replicas of the specific instance; if not found, returns None; and
1796 - The string corresponding to the replica count key in the Helm values
aktas867418c2021-10-19 18:26:13 +03001797 """
1798
1799 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001800 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1801 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001802 )
1803
Pedro Escaleira06313992022-06-04 22:21:57 +01001804 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1805
aktas867418c2021-10-19 18:26:13 +03001806 if not kdu_values:
1807 raise K8sException(
1808 "kdu_values not found for kdu_model {}".format(kdu_model)
1809 )
1810
1811 if resource_name:
1812 kdu_values = kdu_values.get(resource_name, None)
1813
1814 if not kdu_values:
1815 msg = "resource {} not found in the values in model {}".format(
1816 resource_name, kdu_model
1817 )
1818 self.log.error(msg)
1819 raise K8sException(msg)
1820
1821 duplicate_check = False
1822
1823 replica_str = ""
1824 replicas = None
1825
Pedro Escaleira06313992022-06-04 22:21:57 +01001826 if kdu_values.get("replicaCount") is not None:
aktas867418c2021-10-19 18:26:13 +03001827 replicas = kdu_values["replicaCount"]
1828 replica_str = "replicaCount"
Pedro Escaleira06313992022-06-04 22:21:57 +01001829 elif kdu_values.get("replicas") is not None:
aktas867418c2021-10-19 18:26:13 +03001830 duplicate_check = True
1831 replicas = kdu_values["replicas"]
1832 replica_str = "replicas"
1833 else:
1834 if resource_name:
1835 msg = (
1836 "replicaCount or replicas not found in the resource"
1837 "{} values in model {}. Cannot be scaled".format(
1838 resource_name, kdu_model
1839 )
1840 )
1841 else:
1842 msg = (
1843 "replicaCount or replicas not found in the values"
1844 "in model {}. Cannot be scaled".format(kdu_model)
1845 )
1846 self.log.error(msg)
1847 raise K8sException(msg)
1848
1849 # Control if replicas and replicaCount exists at the same time
1850 msg = "replicaCount and replicas are exists at the same time"
1851 if duplicate_check:
1852 if "replicaCount" in kdu_values:
1853 self.log.error(msg)
1854 raise K8sException(msg)
1855 else:
1856 if "replicas" in kdu_values:
1857 self.log.error(msg)
1858 raise K8sException(msg)
1859
1860 return replicas, replica_str
1861
1862 async def _get_replica_count_instance(
1863 self,
1864 kdu_instance: str,
1865 namespace: str,
1866 kubeconfig: str,
1867 resource_name: str = None,
Pedro Escaleira06313992022-06-04 22:21:57 +01001868 ) -> int:
aktas867418c2021-10-19 18:26:13 +03001869 """Get the replica count value in the instance.
1870
1871 Args:
1872 kdu_instance: The name of the KDU instance
1873 namespace: KDU instance namespace
1874 kubeconfig:
1875 resource_name: Resource name
1876
1877 Returns:
Pedro Escaleira06313992022-06-04 22:21:57 +01001878 The number of replicas of the specific instance; if not found, returns None
aktas867418c2021-10-19 18:26:13 +03001879 """
1880
1881 kdu_values = yaml.load(
1882 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1883 Loader=yaml.SafeLoader,
1884 )
1885
Pedro Escaleira06313992022-06-04 22:21:57 +01001886 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1887
aktas867418c2021-10-19 18:26:13 +03001888 replicas = None
1889
1890 if kdu_values:
1891 resource_values = (
1892 kdu_values.get(resource_name, None) if resource_name else None
1893 )
Pedro Escaleira06313992022-06-04 22:21:57 +01001894
1895 for replica_str in ("replicaCount", "replicas"):
1896 if resource_values:
1897 replicas = resource_values.get(replica_str)
1898 else:
1899 replicas = kdu_values.get(replica_str)
1900
1901 if replicas is not None:
1902 break
aktas867418c2021-10-19 18:26:13 +03001903
1904 return replicas
1905
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001906 async def _store_status(
1907 self,
1908 cluster_id: str,
1909 operation: str,
1910 kdu_instance: str,
1911 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001912 db_dict: dict = None,
Pedro Escaleirab46f88d2022-04-23 19:55:45 +01001913 ) -> None:
1914 """
1915 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1916
1917 :param cluster_id (str): the cluster where the KDU instance is deployed
1918 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1919 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1920 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1921 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1922 values for the keys:
1923 - "collection": The Mongo DB collection to write to
1924 - "filter": The query filter to use in the update process
1925 - "path": The dot separated keys which targets the object to be updated
1926 Defaults to None.
1927 """
1928
1929 try:
1930 detailed_status = await self._status_kdu(
1931 cluster_id=cluster_id,
1932 kdu_instance=kdu_instance,
1933 yaml_format=False,
1934 namespace=namespace,
1935 )
1936
1937 status = detailed_status.get("info").get("description")
1938 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1939
1940 # write status to db
1941 result = await self.write_app_status_to_db(
1942 db_dict=db_dict,
1943 status=str(status),
1944 detailed_status=str(detailed_status),
1945 operation=operation,
1946 )
1947
1948 if not result:
1949 self.log.info("Error writing in database. Task exiting...")
1950
1951 except asyncio.CancelledError as e:
1952 self.log.warning(
1953 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1954 )
1955 except Exception as e:
1956 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001957
1958 # params for use in -f file
1959 # returns values file option and filename (in order to delete it at the end)
1960 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1961
1962 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001963 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001964
1965 def get_random_number():
1966 r = random.randrange(start=1, stop=99999999)
1967 s = str(r)
1968 while len(s) < 10:
1969 s = "0" + s
1970 return s
1971
1972 params2 = dict()
1973 for key in params:
1974 value = params.get(key)
1975 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001976 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001977 params2[key] = value
1978
1979 values_file = get_random_number() + ".yaml"
1980 with open(values_file, "w") as stream:
1981 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1982
1983 return "-f {}".format(values_file), values_file
1984
1985 return "", None
1986
1987 # params for use in --set option
1988 @staticmethod
1989 def _params_to_set_option(params: dict) -> str:
1990 params_str = ""
1991 if params and len(params) > 0:
1992 start = True
1993 for key in params:
1994 value = params.get(key, None)
1995 if value is not None:
1996 if start:
1997 params_str += "--set "
1998 start = False
1999 else:
2000 params_str += ","
2001 params_str += "{}={}".format(key, value)
2002 return params_str
2003
2004 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01002005 def generate_kdu_instance_name(**kwargs):
2006 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002007 # check embeded chart (file or dir)
2008 if chart_name.startswith("/"):
2009 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02002010 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002011 # check URL
2012 elif "://" in chart_name:
2013 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02002014 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00002015
2016 name = ""
2017 for c in chart_name:
2018 if c.isalpha() or c.isnumeric():
2019 name += c
2020 else:
2021 name += "-"
2022 if len(name) > 35:
2023 name = name[0:35]
2024
2025 # if does not start with alpha character, prefix 'a'
2026 if not name[0].isalpha():
2027 name = "a" + name
2028
2029 name += "-"
2030
2031 def get_random_number():
2032 r = random.randrange(start=1, stop=99999999)
2033 s = str(r)
2034 s = s.rjust(10, "0")
2035 return s
2036
2037 name = name + get_random_number()
2038 return name.lower()
aktas867418c2021-10-19 18:26:13 +03002039
2040 def _split_version(self, kdu_model: str) -> (str, str):
2041 version = None
garciadeblas04393192022-06-08 15:39:24 +02002042 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03002043 parts = kdu_model.split(sep=":")
2044 if len(parts) == 2:
2045 version = str(parts[1])
2046 kdu_model = parts[0]
2047 return kdu_model, version
2048
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002049 def _split_repo(self, kdu_model: str) -> (str, str):
2050 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2051
2052 Args:
2053 kdu_model (str): Associated KDU model
2054
2055 Returns:
2056 (str, str): Tuple with the Chart name in index 0, and the repo name
2057 in index 2; if there was a problem finding them, return None
2058 for both
2059 """
2060
2061 chart_name = None
garciadeblas7faf4ec2022-04-08 22:53:25 +02002062 repo_name = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002063
garciadeblas7faf4ec2022-04-08 22:53:25 +02002064 idx = kdu_model.find("/")
2065 if idx >= 0:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002066 chart_name = kdu_model[idx + 1 :]
garciadeblas7faf4ec2022-04-08 22:53:25 +02002067 repo_name = kdu_model[:idx]
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002068
2069 return chart_name, repo_name
garciadeblas7faf4ec2022-04-08 22:53:25 +02002070
aktas867418c2021-10-19 18:26:13 +03002071 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01002072 """Obtain the Helm repository for an Helm Chart
2073
2074 Args:
2075 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2076 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2077
2078 Returns:
2079 str: the repository URL; if Helm Chart is a local one, the function returns None
2080 """
2081
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002082 _, repo_name = self._split_repo(kdu_model=kdu_model)
2083
aktas867418c2021-10-19 18:26:13 +03002084 repo_url = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002085 if repo_name:
aktas867418c2021-10-19 18:26:13 +03002086 # Find repository link
2087 local_repo_list = await self.repo_list(cluster_uuid)
2088 for repo in local_repo_list:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002089 if repo["name"] == repo_name:
2090 repo_url = repo["url"]
2091 break # it is not necessary to continue the loop if the repo link was found...
2092
aktas867418c2021-10-19 18:26:13 +03002093 return repo_url