blob: afb7bc23814befc2e5483308d3eb89595aae46a0 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleira1f222a92022-06-20 15:40:43 +010093 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
Pedro Escaleirab41de172022-04-02 00:44:08 +010096 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000097 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010098 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000108
109 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200124 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100131 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000132 else:
133 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
garciadeblas82b591c2021-03-24 09:22:13 +0100135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
Pedro Escaleirab41de172022-04-02 00:44:08 +0100155 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000156
157 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000166 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100169 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100170 )
171 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000172
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 # init_env
174 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100175 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000176 )
177
bravof7bd5c6a2021-11-17 11:14:57 -0300178 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100179 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300180
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000181 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300183 paths["kube_config"], self._helm_command, name, url
184 )
bravof0ab522f2021-11-23 19:33:18 -0300185
186 if cert:
187 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000201 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000205
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000215 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100216 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000217
garciadeblas7faf4ec2022-04-08 22:53:25 +0200218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
Pedro Escaleirab41de172022-04-02 00:44:08 +0100250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000251
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 # config filename
253 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100254 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000255 )
256
bravof7bd5c6a2021-11-17 11:14:57 -0300257 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100258 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100270 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000286
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 # init env, paths
288 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100289 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000290 )
291
bravof7bd5c6a2021-11-17 11:14:57 -0300292 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100293 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
garciadeblas82b591c2021-03-24 09:22:13 +0100298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000300 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100303 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000304
305 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000311 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200312 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000313
David Garciaeb8943a2021-04-12 12:07:37 +0200314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100325 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100326 )
327 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100330 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100359 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000360 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367
368 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000371 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100372 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
garciadeblas04393192022-06-08 15:39:24 +0200377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
lloretgalleg095392b2020-11-20 11:28:08 +0000380 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000393 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
aktas867418c2021-10-19 18:26:13 +0300405 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000406
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100407 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200408 if repo:
limon3c443f52022-07-21 13:55:55 +0200409 await self.repo_update(cluster_id, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200410
garciadeblas82b591c2021-03-24 09:22:13 +0100411 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100420 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000486
487 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100488 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100497 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000498 )
499
bravof7bd5c6a2021-11-17 11:14:57 -0300500 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100501 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300502
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100505 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000506 )
507
508 # version
aktas867418c2021-10-19 18:26:13 +0300509 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000510
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +0100511 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200512 if repo:
limon3c443f52022-07-21 13:55:55 +0200513 await self.repo_update(cluster_uuid, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200514
garciadeblas82b591c2021-03-24 09:22:13 +0100515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300523 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100524 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100539 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000544 )
545 )
546
547 # wait for execution task
548 await asyncio.wait([exec_task])
549
550 # cancel status task
551 status_task.cancel()
552 output, rc = exec_task.result()
553
554 else:
555
556 output, rc = await self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559
560 # remove temporal values yaml file
561 if file_to_delete:
562 os.remove(file_to_delete)
563
564 # write final status
565 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100566 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000567 kdu_instance=kdu_instance,
568 namespace=instance_info["namespace"],
569 db_dict=db_dict,
570 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100579 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
aktas2962f3e2021-03-15 11:05:35 +0300592 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100602 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300603 ):
aktas867418c2021-10-19 18:26:13 +0300604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
Pedro Escaleirab41de172022-04-02 00:44:08 +0100622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100625 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100638 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300645
646 _, replica_str = await self._get_replica_count_url(
647 kdu_model, repo_url, resource_name
648 )
649
650 command = self._get_upgrade_scale_command(
651 kdu_model,
652 kdu_instance,
653 instance_info["namespace"],
654 scale,
655 version,
656 atomic,
657 replica_str,
658 total_timeout,
659 resource_name,
660 paths["kube_config"],
661 )
662
663 self.log.debug("scaling: {}".format(command))
664
665 if atomic:
666 # exec helm in a task
667 exec_task = asyncio.ensure_future(
668 coro_or_future=self._local_async_exec(
669 command=command, raise_exception_on_error=False, env=env
670 )
671 )
672 # write status in another task
673 status_task = asyncio.ensure_future(
674 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100675 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300676 kdu_instance=kdu_instance,
677 namespace=instance_info["namespace"],
678 db_dict=db_dict,
679 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300680 )
681 )
682
683 # wait for execution task
684 await asyncio.wait([exec_task])
685
686 # cancel status task
687 status_task.cancel()
688 output, rc = exec_task.result()
689
690 else:
691 output, rc = await self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694
695 # write final status
696 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100697 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300698 kdu_instance=kdu_instance,
699 namespace=instance_info["namespace"],
700 db_dict=db_dict,
701 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300702 )
703
704 if rc != 0:
705 msg = "Error executing command: {}\nOutput: {}".format(command, output)
706 self.log.error(msg)
707 raise K8sException(msg)
708
709 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100710 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300711
712 return True
aktas2962f3e2021-03-15 11:05:35 +0300713
714 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100715 self,
716 resource_name: str,
717 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300718 cluster_uuid: str,
719 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100720 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300721 ) -> int:
722 """Get a resource scale count.
723
724 Args:
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100728 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300729 kwargs: Additional parameters
730
731 Returns:
732 Resource instance count
733 """
734
aktas867418c2021-10-19 18:26:13 +0300735 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100736 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300737 )
738
739 # look for instance to obtain namespace
740 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
741 if not instance_info:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance))
743
744 # init env, paths
745 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100746 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300747 )
748
749 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100750 kdu_instance=kdu_instance,
751 namespace=instance_info["namespace"],
752 kubeconfig=paths["kube_config"],
753 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300754 )
755
756 # Get default value if scale count is not found from provided values
757 if not replicas:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100758 repo_url = await self._find_repo(
759 kdu_model=kdu_model, cluster_uuid=cluster_uuid
760 )
aktas867418c2021-10-19 18:26:13 +0300761 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100762 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300763 )
764
765 if not replicas:
766 msg = "Replica count not found. Cannot be scaled"
767 self.log.error(msg)
768 raise K8sException(msg)
769
770 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300771
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000772 async def rollback(
773 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
774 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000775 self.log.debug(
776 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100777 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000778 )
779 )
780
781 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100782 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000783
784 # look for instance to obtain namespace
785 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
786 if not instance_info:
787 raise K8sException("kdu_instance {} not found".format(kdu_instance))
788
789 # init env, paths
790 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100791 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000792 )
793
bravof7bd5c6a2021-11-17 11:14:57 -0300794 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100795 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300796
garciadeblas82b591c2021-03-24 09:22:13 +0100797 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300798 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100799 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000800
801 self.log.debug("rolling_back: {}".format(command))
802
803 # exec helm in a task
804 exec_task = asyncio.ensure_future(
805 coro_or_future=self._local_async_exec(
806 command=command, raise_exception_on_error=False, env=env
807 )
808 )
809 # write status in another task
810 status_task = asyncio.ensure_future(
811 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100812 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000813 kdu_instance=kdu_instance,
814 namespace=instance_info["namespace"],
815 db_dict=db_dict,
816 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000817 )
818 )
819
820 # wait for execution task
821 await asyncio.wait([exec_task])
822
823 # cancel status task
824 status_task.cancel()
825
826 output, rc = exec_task.result()
827
828 # write final status
829 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100830 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000831 kdu_instance=kdu_instance,
832 namespace=instance_info["namespace"],
833 db_dict=db_dict,
834 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000835 )
836
837 if rc != 0:
838 msg = "Error executing command: {}\nOutput: {}".format(command, output)
839 self.log.error(msg)
840 raise K8sException(msg)
841
842 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100843 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000844
845 # return new revision number
846 instance = await self.get_instance_info(
847 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
848 )
849 if instance:
850 revision = int(instance.get("revision"))
851 self.log.debug("New revision: {}".format(revision))
852 return revision
853 else:
854 return 0
855
David Garciaeb8943a2021-04-12 12:07:37 +0200856 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000857 """
858 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
859 (this call should happen after all _terminate-config-primitive_ of the VNF
860 are invoked).
861
862 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
863 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200864 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000865 :return: True if successful
866 """
867
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000868 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100869 "uninstall kdu_instance {} from cluster {}".format(
870 kdu_instance, cluster_uuid
871 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000872 )
873
874 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100875 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000876
877 # look for instance to obtain namespace
878 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
879 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200880 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
881 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000882 # init env, paths
883 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100884 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885 )
886
bravof7bd5c6a2021-11-17 11:14:57 -0300887 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100888 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300889
890 command = self._get_uninstall_command(
891 kdu_instance, instance_info["namespace"], paths["kube_config"]
892 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000893 output, _rc = await self._local_async_exec(
894 command=command, raise_exception_on_error=True, env=env
895 )
896
897 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100898 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000899
900 return self._output_to_table(output)
901
902 async def instances_list(self, cluster_uuid: str) -> list:
903 """
904 returns a list of deployed releases in a cluster
905
906 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
907 :return:
908 """
909
Pedro Escaleirab41de172022-04-02 00:44:08 +0100910 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000911
912 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100913 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000914
915 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100916 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000917
918 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100919 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000920
921 return result
922
923 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
924 instances = await self.instances_list(cluster_uuid=cluster_uuid)
925 for instance in instances:
926 if instance.get("name") == kdu_instance:
927 return instance
928 self.log.debug("Instance {} not found".format(kdu_instance))
929 return None
930
aticig8070c3c2022-04-18 00:31:42 +0300931 async def upgrade_charm(
932 self,
933 ee_id: str = None,
934 path: str = None,
935 charm_id: str = None,
936 charm_type: str = None,
937 timeout: float = None,
938 ) -> str:
939 """This method upgrade charms in VNFs
940
941 Args:
942 ee_id: Execution environment id
943 path: Local path to the charm
944 charm_id: charm-id
945 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
946 timeout: (Float) Timeout for the ns update operation
947
948 Returns:
949 The output of the update operation if status equals to "completed"
950 """
951 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
952
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000953 async def exec_primitive(
954 self,
955 cluster_uuid: str = None,
956 kdu_instance: str = None,
957 primitive_name: str = None,
958 timeout: float = 300,
959 params: dict = None,
960 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200961 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000962 ) -> str:
963 """Exec primitive (Juju action)
964
965 :param cluster_uuid: The UUID of the cluster or namespace:cluster
966 :param kdu_instance: The unique name of the KDU instance
967 :param primitive_name: Name of action that will be executed
968 :param timeout: Timeout for action execution
969 :param params: Dictionary of all the parameters needed for the action
970 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200971 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000972
973 :return: Returns the output of the action
974 """
975 raise K8sException(
976 "KDUs deployed with Helm don't support actions "
977 "different from rollback, upgrade and status"
978 )
979
garciadeblas82b591c2021-03-24 09:22:13 +0100980 async def get_services(
981 self, cluster_uuid: str, kdu_instance: str, namespace: str
982 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000983 """
984 Returns a list of services defined for the specified kdu instance.
985
986 :param cluster_uuid: UUID of a K8s cluster known by OSM
987 :param kdu_instance: unique name for the KDU instance
988 :param namespace: K8s namespace used by the KDU instance
989 :return: If successful, it will return a list of services, Each service
990 can have the following data:
991 - `name` of the service
992 - `type` type of service in the k8 cluster
993 - `ports` List of ports offered by the service, for each port includes at least
994 name, port, protocol
995 - `cluster_ip` Internal ip to be used inside k8s cluster
996 - `external_ip` List of external ips (in case they are available)
997 """
998
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000999 self.log.debug(
1000 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1001 cluster_uuid, kdu_instance
1002 )
1003 )
1004
bravof7bd5c6a2021-11-17 11:14:57 -03001005 # init env, paths
1006 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001007 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001008 )
1009
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001010 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001011 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001012
1013 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001014 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001015 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001016 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001017
1018 service_list = []
1019 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001020 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001021 service_list.append(service)
1022
1023 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001024 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001025
1026 return service_list
1027
garciadeblas82b591c2021-03-24 09:22:13 +01001028 async def get_service(
1029 self, cluster_uuid: str, service_name: str, namespace: str
1030 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001031
1032 self.log.debug(
1033 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001034 service_name, namespace, cluster_uuid
1035 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001036 )
1037
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001038 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001039 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001040
Pedro Escaleirab41de172022-04-02 00:44:08 +01001041 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001042
1043 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001044 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001045
1046 return service
1047
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001048 async def status_kdu(
1049 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1050 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001051 """
1052 This call would retrieve tha current state of a given KDU instance. It would be
1053 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1054 values_ of the configuration parameters applied to a given instance. This call
1055 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001056
David Garciaeb8943a2021-04-12 12:07:37 +02001057 :param cluster_uuid: UUID of a K8s cluster known by OSM
1058 :param kdu_instance: unique name for the KDU instance
1059 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001060 :param yaml_format: if the return shall be returned as an YAML string or as a
1061 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001062 :return: If successful, it will return the following vector of arguments:
1063 - K8s `namespace` in the cluster where the KDU lives
1064 - `state` of the KDU instance. It can be:
1065 - UNKNOWN
1066 - DEPLOYED
1067 - DELETED
1068 - SUPERSEDED
1069 - FAILED or
1070 - DELETING
1071 - List of `resources` (objects) that this release consists of, sorted by kind,
1072 and the status of those resources
1073 - Last `deployment_time`.
1074
1075 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001076 self.log.debug(
1077 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1078 cluster_uuid, kdu_instance
1079 )
1080 )
1081
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001082 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001083 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001084
1085 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001086 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001087 for instance in instances:
1088 if instance.get("name") == kdu_instance:
1089 break
1090 else:
1091 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001092 raise K8sException(
1093 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001094 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001095 )
1096 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001097
1098 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001099 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001100 kdu_instance=kdu_instance,
1101 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001102 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001103 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001104 )
1105
1106 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001107 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001108
1109 return status
1110
aktas867418c2021-10-19 18:26:13 +03001111 async def get_values_kdu(
1112 self, kdu_instance: str, namespace: str, kubeconfig: str
1113 ) -> str:
1114
1115 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1116
1117 return await self._exec_get_command(
1118 get_command="values",
1119 kdu_instance=kdu_instance,
1120 namespace=namespace,
1121 kubeconfig=kubeconfig,
1122 )
1123
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001124 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001125 """Method to obtain the Helm Chart package's values
1126
1127 Args:
1128 kdu_model: The name or path of an Helm Chart
1129 repo_url: Helm Chart repository url
1130
1131 Returns:
1132 str: the values of the Helm Chart package
1133 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001134
1135 self.log.debug(
1136 "inspect kdu_model values {} from (optional) repo: {}".format(
1137 kdu_model, repo_url
1138 )
1139 )
1140
aktas867418c2021-10-19 18:26:13 +03001141 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001142 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1143 )
1144
1145 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1146
1147 self.log.debug(
1148 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1149 )
1150
aktas867418c2021-10-19 18:26:13 +03001151 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001152 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1153 )
1154
1155 async def synchronize_repos(self, cluster_uuid: str):
1156
1157 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1158 try:
1159 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1160 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1161
1162 local_repo_list = await self.repo_list(cluster_uuid)
1163 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1164
1165 deleted_repo_list = []
1166 added_repo_dict = {}
1167
1168 # iterate over the list of repos in the database that should be
1169 # added if not present
1170 for repo_name, db_repo in db_repo_dict.items():
1171 try:
1172 # check if it is already present
1173 curr_repo_url = local_repo_dict.get(db_repo["name"])
1174 repo_id = db_repo.get("_id")
1175 if curr_repo_url != db_repo["url"]:
1176 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001177 self.log.debug(
1178 "repo {} url changed, delete and and again".format(
1179 db_repo["url"]
1180 )
1181 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001182 await self.repo_remove(cluster_uuid, db_repo["name"])
1183 deleted_repo_list.append(repo_id)
1184
1185 # add repo
1186 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001187 if "ca_cert" in db_repo:
1188 await self.repo_add(
1189 cluster_uuid,
1190 db_repo["name"],
1191 db_repo["url"],
1192 cert=db_repo["ca_cert"],
1193 )
1194 else:
1195 await self.repo_add(
1196 cluster_uuid,
1197 db_repo["name"],
1198 db_repo["url"],
1199 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001200 added_repo_dict[repo_id] = db_repo["name"]
1201 except Exception as e:
1202 raise K8sException(
1203 "Error adding repo id: {}, err_msg: {} ".format(
1204 repo_id, repr(e)
1205 )
1206 )
1207
1208 # Delete repos that are present but not in nbi_list
1209 for repo_name in local_repo_dict:
1210 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1211 self.log.debug("delete repo {}".format(repo_name))
1212 try:
1213 await self.repo_remove(cluster_uuid, repo_name)
1214 deleted_repo_list.append(repo_name)
1215 except Exception as e:
1216 self.warning(
1217 "Error deleting repo, name: {}, err_msg: {}".format(
1218 repo_name, str(e)
1219 )
1220 )
1221
1222 return deleted_repo_list, added_repo_dict
1223
1224 except K8sException:
1225 raise
1226 except Exception as e:
1227 # Do not raise errors synchronizing repos
1228 self.log.error("Error synchronizing repos: {}".format(e))
1229 raise Exception("Error synchronizing repos: {}".format(e))
1230
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001231 def _get_db_repos_dict(self, repo_ids: list):
1232 db_repos_dict = {}
1233 for repo_id in repo_ids:
1234 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1235 db_repos_dict[db_repo["name"]] = db_repo
1236 return db_repos_dict
1237
1238 """
1239 ####################################################################################
1240 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1241 ####################################################################################
1242 """
1243
1244 @abc.abstractmethod
1245 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1246 """
1247 Creates and returns base cluster and kube dirs and returns them.
1248 Also created helm3 dirs according to new directory specification, paths are
1249 not returned but assigned to helm environment variables
1250
1251 :param cluster_name: cluster_name
1252 :return: Dictionary with config_paths and dictionary with helm environment variables
1253 """
1254
1255 @abc.abstractmethod
1256 async def _cluster_init(self, cluster_id, namespace, paths, env):
1257 """
1258 Implements the helm version dependent cluster initialization
1259 """
1260
1261 @abc.abstractmethod
1262 async def _instances_list(self, cluster_id):
1263 """
1264 Implements the helm version dependent helm instances list
1265 """
1266
1267 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001268 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001269 """
1270 Implements the helm version dependent method to obtain services from a helm instance
1271 """
1272
1273 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001274 async def _status_kdu(
1275 self,
1276 cluster_id: str,
1277 kdu_instance: str,
1278 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001279 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001280 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001281 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001282 """
1283 Implements the helm version dependent method to obtain status of a helm instance
1284 """
1285
1286 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001287 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001288 self,
1289 kdu_model,
1290 kdu_instance,
1291 namespace,
1292 params_str,
1293 version,
1294 atomic,
1295 timeout,
1296 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001297 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001298 """
1299 Obtain command to be executed to delete the indicated instance
1300 """
1301
1302 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001303 def _get_upgrade_scale_command(
1304 self,
1305 kdu_model,
1306 kdu_instance,
1307 namespace,
1308 count,
1309 version,
1310 atomic,
1311 replicas,
1312 timeout,
1313 resource_name,
1314 kubeconfig,
1315 ) -> str:
1316 """Obtain command to be executed to upgrade the indicated instance."""
1317
1318 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001319 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001320 self,
1321 kdu_model,
1322 kdu_instance,
1323 namespace,
1324 params_str,
1325 version,
1326 atomic,
1327 timeout,
1328 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001329 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001330 """
1331 Obtain command to be executed to upgrade the indicated instance
1332 """
1333
1334 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001335 def _get_rollback_command(
1336 self, kdu_instance, namespace, revision, kubeconfig
1337 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001338 """
1339 Obtain command to be executed to rollback the indicated instance
1340 """
1341
1342 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001343 def _get_uninstall_command(
1344 self, kdu_instance: str, namespace: str, kubeconfig: str
1345 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001346 """
1347 Obtain command to be executed to delete the indicated instance
1348 """
1349
1350 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001351 def _get_inspect_command(
1352 self, show_command: str, kdu_model: str, repo_str: str, version: str
1353 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001354 """Generates the command to obtain the information about an Helm Chart package
1355 (´helm show ...´ command)
1356
1357 Args:
1358 show_command: the second part of the command (`helm show <show_command>`)
1359 kdu_model: The name or path of an Helm Chart
1360 repo_url: Helm Chart repository url
1361 version: constraint with specific version of the Chart to use
1362
1363 Returns:
1364 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001365 """
1366
1367 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001368 def _get_get_command(
1369 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1370 ):
1371 """Obtain command to be executed to get information about the kdu instance."""
1372
1373 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001374 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1375 """
1376 Method call to uninstall cluster software for helm. This method is dependent
1377 of helm version
1378 For Helm v2 it will be called when Tiller must be uninstalled
1379 For Helm v3 it does nothing and does not need to be callled
1380 """
1381
lloretgalleg095392b2020-11-20 11:28:08 +00001382 @abc.abstractmethod
1383 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1384 """
1385 Obtains the cluster repos identifiers
1386 """
1387
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001388 """
1389 ####################################################################################
1390 ################################### P R I V A T E ##################################
1391 ####################################################################################
1392 """
1393
1394 @staticmethod
1395 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1396 if os.path.exists(filename):
1397 return True
1398 else:
1399 msg = "File {} does not exist".format(filename)
1400 if exception_if_not_exists:
1401 raise K8sException(msg)
1402
1403 @staticmethod
1404 def _remove_multiple_spaces(strobj):
1405 strobj = strobj.strip()
1406 while " " in strobj:
1407 strobj = strobj.replace(" ", " ")
1408 return strobj
1409
1410 @staticmethod
1411 def _output_to_lines(output: str) -> list:
1412 output_lines = list()
1413 lines = output.splitlines(keepends=False)
1414 for line in lines:
1415 line = line.strip()
1416 if len(line) > 0:
1417 output_lines.append(line)
1418 return output_lines
1419
1420 @staticmethod
1421 def _output_to_table(output: str) -> list:
1422 output_table = list()
1423 lines = output.splitlines(keepends=False)
1424 for line in lines:
1425 line = line.replace("\t", " ")
1426 line_list = list()
1427 output_table.append(line_list)
1428 cells = line.split(sep=" ")
1429 for cell in cells:
1430 cell = cell.strip()
1431 if len(cell) > 0:
1432 line_list.append(cell)
1433 return output_table
1434
1435 @staticmethod
1436 def _parse_services(output: str) -> list:
1437 lines = output.splitlines(keepends=False)
1438 services = []
1439 for line in lines:
1440 line = line.replace("\t", " ")
1441 cells = line.split(sep=" ")
1442 if len(cells) > 0 and cells[0].startswith("service/"):
1443 elems = cells[0].split(sep="/")
1444 if len(elems) > 1:
1445 services.append(elems[1])
1446 return services
1447
1448 @staticmethod
1449 def _get_deep(dictionary: dict, members: tuple):
1450 target = dictionary
1451 value = None
1452 try:
1453 for m in members:
1454 value = target.get(m)
1455 if not value:
1456 return None
1457 else:
1458 target = value
1459 except Exception:
1460 pass
1461 return value
1462
1463 # find key:value in several lines
1464 @staticmethod
1465 def _find_in_lines(p_lines: list, p_key: str) -> str:
1466 for line in p_lines:
1467 try:
1468 if line.startswith(p_key + ":"):
1469 parts = line.split(":")
1470 the_value = parts[1].strip()
1471 return the_value
1472 except Exception:
1473 # ignore it
1474 pass
1475 return None
1476
1477 @staticmethod
1478 def _lower_keys_list(input_list: list):
1479 """
1480 Transform the keys in a list of dictionaries to lower case and returns a new list
1481 of dictionaries
1482 """
1483 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001484 if input_list:
1485 for dictionary in input_list:
1486 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1487 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001488 return new_list
1489
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001490 async def _local_async_exec(
1491 self,
1492 command: str,
1493 raise_exception_on_error: bool = False,
1494 show_error_log: bool = True,
1495 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001496 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001497 ) -> (str, int):
1498
1499 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001500 self.log.debug(
1501 "Executing async local command: {}, env: {}".format(command, env)
1502 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001503
1504 # split command
1505 command = shlex.split(command)
1506
1507 environ = os.environ.copy()
1508 if env:
1509 environ.update(env)
1510
1511 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001512 async with self.cmd_lock:
1513 process = await asyncio.create_subprocess_exec(
1514 *command,
1515 stdout=asyncio.subprocess.PIPE,
1516 stderr=asyncio.subprocess.PIPE,
1517 env=environ,
1518 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001519
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001520 # wait for command terminate
1521 stdout, stderr = await process.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001522
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001523 return_code = process.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001524
1525 output = ""
1526 if stdout:
1527 output = stdout.decode("utf-8").strip()
1528 # output = stdout.decode()
1529 if stderr:
1530 output = stderr.decode("utf-8").strip()
1531 # output = stderr.decode()
1532
1533 if return_code != 0 and show_error_log:
1534 self.log.debug(
1535 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1536 )
1537 else:
1538 self.log.debug("Return code: {}".format(return_code))
1539
1540 if raise_exception_on_error and return_code != 0:
1541 raise K8sException(output)
1542
1543 if encode_utf8:
1544 output = output.encode("utf-8").strip()
1545 output = str(output).replace("\\n", "\n")
1546
1547 return output, return_code
1548
1549 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001550 # first, kill the process if it is still running
1551 if process.returncode is None:
1552 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001553 raise
1554 except K8sException:
1555 raise
1556 except Exception as e:
1557 msg = "Exception executing command: {} -> {}".format(command, e)
1558 self.log.error(msg)
1559 if raise_exception_on_error:
1560 raise K8sException(e) from e
1561 else:
1562 return "", -1
1563
garciadeblas82b591c2021-03-24 09:22:13 +01001564 async def _local_async_exec_pipe(
1565 self,
1566 command1: str,
1567 command2: str,
1568 raise_exception_on_error: bool = True,
1569 show_error_log: bool = True,
1570 encode_utf8: bool = False,
1571 env: dict = None,
1572 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001573
1574 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1575 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1576 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001577 self.log.debug(
1578 "Executing async local command: {}, env: {}".format(command, env)
1579 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001580
1581 # split command
1582 command1 = shlex.split(command1)
1583 command2 = shlex.split(command2)
1584
1585 environ = os.environ.copy()
1586 if env:
1587 environ.update(env)
1588
1589 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001590 async with self.cmd_lock:
1591 read, write = os.pipe()
Pedro Escaleirad3817992022-07-23 23:34:42 +01001592 process_1 = await asyncio.create_subprocess_exec(
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001593 *command1, stdout=write, env=environ
1594 )
1595 os.close(write)
1596 process_2 = await asyncio.create_subprocess_exec(
1597 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1598 )
1599 os.close(read)
1600 stdout, stderr = await process_2.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001601
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001602 return_code = process_2.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001603
1604 output = ""
1605 if stdout:
1606 output = stdout.decode("utf-8").strip()
1607 # output = stdout.decode()
1608 if stderr:
1609 output = stderr.decode("utf-8").strip()
1610 # output = stderr.decode()
1611
1612 if return_code != 0 and show_error_log:
1613 self.log.debug(
1614 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1615 )
1616 else:
1617 self.log.debug("Return code: {}".format(return_code))
1618
1619 if raise_exception_on_error and return_code != 0:
1620 raise K8sException(output)
1621
1622 if encode_utf8:
1623 output = output.encode("utf-8").strip()
1624 output = str(output).replace("\\n", "\n")
1625
1626 return output, return_code
1627 except asyncio.CancelledError:
Pedro Escaleirad3817992022-07-23 23:34:42 +01001628 # first, kill the processes if they are still running
1629 for process in (process_1, process_2):
1630 if process.returncode is None:
1631 process.kill()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001632 raise
1633 except K8sException:
1634 raise
1635 except Exception as e:
1636 msg = "Exception executing command: {} -> {}".format(command, e)
1637 self.log.error(msg)
1638 if raise_exception_on_error:
1639 raise K8sException(e) from e
1640 else:
1641 return "", -1
1642
1643 async def _get_service(self, cluster_id, service_name, namespace):
1644 """
1645 Obtains the data of the specified service in the k8cluster.
1646
1647 :param cluster_id: id of a K8s cluster known by OSM
1648 :param service_name: name of the K8s service in the specified namespace
1649 :param namespace: K8s namespace used by the KDU instance
1650 :return: If successful, it will return a service with the following data:
1651 - `name` of the service
1652 - `type` type of service in the k8 cluster
1653 - `ports` List of ports offered by the service, for each port includes at least
1654 name, port, protocol
1655 - `cluster_ip` Internal ip to be used inside k8s cluster
1656 - `external_ip` List of external ips (in case they are available)
1657 """
1658
1659 # init config, env
1660 paths, env = self._init_paths_env(
1661 cluster_name=cluster_id, create_if_not_exist=True
1662 )
1663
1664 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1665 self.kubectl_command, paths["kube_config"], namespace, service_name
1666 )
1667
1668 output, _rc = await self._local_async_exec(
1669 command=command, raise_exception_on_error=True, env=env
1670 )
1671
1672 data = yaml.load(output, Loader=yaml.SafeLoader)
1673
1674 service = {
1675 "name": service_name,
1676 "type": self._get_deep(data, ("spec", "type")),
1677 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001678 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001679 }
1680 if service["type"] == "LoadBalancer":
1681 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1682 ip_list = [elem["ip"] for elem in ip_map_list]
1683 service["external_ip"] = ip_list
1684
1685 return service
1686
aktas867418c2021-10-19 18:26:13 +03001687 async def _exec_get_command(
1688 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1689 ):
1690 """Obtains information about the kdu instance."""
1691
1692 full_command = self._get_get_command(
1693 get_command, kdu_instance, namespace, kubeconfig
1694 )
1695
1696 output, _rc = await self._local_async_exec(command=full_command)
1697
1698 return output
1699
1700 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001701 self, inspect_command: str, kdu_model: str, repo_url: str = None
1702 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001703 """Obtains information about an Helm Chart package (´helm show´ command)
1704
1705 Args:
1706 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1707 kdu_model: The name or path of an Helm Chart
1708 repo_url: Helm Chart repository url
1709
1710 Returns:
1711 str: the requested info about the Helm Chart package
1712 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001713
1714 repo_str = ""
1715 if repo_url:
1716 repo_str = " --repo {}".format(repo_url)
1717
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001718 # Obtain the Chart's name and store it in the var kdu_model
1719 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001720
aktas867418c2021-10-19 18:26:13 +03001721 kdu_model, version = self._split_version(kdu_model)
1722 if version:
1723 version_str = "--version {}".format(version)
1724 else:
1725 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001726
garciadeblas82b591c2021-03-24 09:22:13 +01001727 full_command = self._get_inspect_command(
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001728 show_command=inspect_command,
1729 kdu_model=kdu_model,
1730 repo_str=repo_str,
1731 version=version_str,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001732 )
1733
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01001734 output, _ = await self._local_async_exec(command=full_command)
aktas867418c2021-10-19 18:26:13 +03001735
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001736 return output
1737
aktas867418c2021-10-19 18:26:13 +03001738 async def _get_replica_count_url(
1739 self,
1740 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001741 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001742 resource_name: str = None,
1743 ):
1744 """Get the replica count value in the Helm Chart Values.
1745
1746 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001747 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001748 repo_url: Helm Chart repository url
1749 resource_name: Resource name
1750
1751 Returns:
1752 True if replicas, False replicaCount
1753 """
1754
1755 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001756 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1757 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001758 )
1759
1760 if not kdu_values:
1761 raise K8sException(
1762 "kdu_values not found for kdu_model {}".format(kdu_model)
1763 )
1764
1765 if resource_name:
1766 kdu_values = kdu_values.get(resource_name, None)
1767
1768 if not kdu_values:
1769 msg = "resource {} not found in the values in model {}".format(
1770 resource_name, kdu_model
1771 )
1772 self.log.error(msg)
1773 raise K8sException(msg)
1774
1775 duplicate_check = False
1776
1777 replica_str = ""
1778 replicas = None
1779
1780 if kdu_values.get("replicaCount", None):
1781 replicas = kdu_values["replicaCount"]
1782 replica_str = "replicaCount"
1783 elif kdu_values.get("replicas", None):
1784 duplicate_check = True
1785 replicas = kdu_values["replicas"]
1786 replica_str = "replicas"
1787 else:
1788 if resource_name:
1789 msg = (
1790 "replicaCount or replicas not found in the resource"
1791 "{} values in model {}. Cannot be scaled".format(
1792 resource_name, kdu_model
1793 )
1794 )
1795 else:
1796 msg = (
1797 "replicaCount or replicas not found in the values"
1798 "in model {}. Cannot be scaled".format(kdu_model)
1799 )
1800 self.log.error(msg)
1801 raise K8sException(msg)
1802
1803 # Control if replicas and replicaCount exists at the same time
1804 msg = "replicaCount and replicas are exists at the same time"
1805 if duplicate_check:
1806 if "replicaCount" in kdu_values:
1807 self.log.error(msg)
1808 raise K8sException(msg)
1809 else:
1810 if "replicas" in kdu_values:
1811 self.log.error(msg)
1812 raise K8sException(msg)
1813
1814 return replicas, replica_str
1815
1816 async def _get_replica_count_instance(
1817 self,
1818 kdu_instance: str,
1819 namespace: str,
1820 kubeconfig: str,
1821 resource_name: str = None,
1822 ):
1823 """Get the replica count value in the instance.
1824
1825 Args:
1826 kdu_instance: The name of the KDU instance
1827 namespace: KDU instance namespace
1828 kubeconfig:
1829 resource_name: Resource name
1830
1831 Returns:
1832 True if replicas, False replicaCount
1833 """
1834
1835 kdu_values = yaml.load(
1836 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1837 Loader=yaml.SafeLoader,
1838 )
1839
1840 replicas = None
1841
1842 if kdu_values:
1843 resource_values = (
1844 kdu_values.get(resource_name, None) if resource_name else None
1845 )
1846 replicas = (
1847 (
1848 resource_values.get("replicaCount", None)
1849 or resource_values.get("replicas", None)
1850 )
1851 if resource_values
1852 else (
1853 kdu_values.get("replicaCount", None)
1854 or kdu_values.get("replicas", None)
1855 )
1856 )
1857
1858 return replicas
1859
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001860 async def _store_status(
1861 self,
1862 cluster_id: str,
1863 operation: str,
1864 kdu_instance: str,
1865 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001866 db_dict: dict = None,
Pedro Escaleirab46f88d2022-04-23 19:55:45 +01001867 ) -> None:
1868 """
1869 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1870
1871 :param cluster_id (str): the cluster where the KDU instance is deployed
1872 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1873 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1874 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1875 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1876 values for the keys:
1877 - "collection": The Mongo DB collection to write to
1878 - "filter": The query filter to use in the update process
1879 - "path": The dot separated keys which targets the object to be updated
1880 Defaults to None.
1881 """
1882
1883 try:
1884 detailed_status = await self._status_kdu(
1885 cluster_id=cluster_id,
1886 kdu_instance=kdu_instance,
1887 yaml_format=False,
1888 namespace=namespace,
1889 )
1890
1891 status = detailed_status.get("info").get("description")
1892 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1893
1894 # write status to db
1895 result = await self.write_app_status_to_db(
1896 db_dict=db_dict,
1897 status=str(status),
1898 detailed_status=str(detailed_status),
1899 operation=operation,
1900 )
1901
1902 if not result:
1903 self.log.info("Error writing in database. Task exiting...")
1904
1905 except asyncio.CancelledError as e:
1906 self.log.warning(
1907 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1908 )
1909 except Exception as e:
1910 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001911
1912 # params for use in -f file
1913 # returns values file option and filename (in order to delete it at the end)
1914 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1915
1916 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001917 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001918
1919 def get_random_number():
1920 r = random.randrange(start=1, stop=99999999)
1921 s = str(r)
1922 while len(s) < 10:
1923 s = "0" + s
1924 return s
1925
1926 params2 = dict()
1927 for key in params:
1928 value = params.get(key)
1929 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001930 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001931 params2[key] = value
1932
1933 values_file = get_random_number() + ".yaml"
1934 with open(values_file, "w") as stream:
1935 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1936
1937 return "-f {}".format(values_file), values_file
1938
1939 return "", None
1940
1941 # params for use in --set option
1942 @staticmethod
1943 def _params_to_set_option(params: dict) -> str:
1944 params_str = ""
1945 if params and len(params) > 0:
1946 start = True
1947 for key in params:
1948 value = params.get(key, None)
1949 if value is not None:
1950 if start:
1951 params_str += "--set "
1952 start = False
1953 else:
1954 params_str += ","
1955 params_str += "{}={}".format(key, value)
1956 return params_str
1957
1958 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001959 def generate_kdu_instance_name(**kwargs):
1960 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001961 # check embeded chart (file or dir)
1962 if chart_name.startswith("/"):
1963 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001964 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001965 # check URL
1966 elif "://" in chart_name:
1967 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001968 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001969
1970 name = ""
1971 for c in chart_name:
1972 if c.isalpha() or c.isnumeric():
1973 name += c
1974 else:
1975 name += "-"
1976 if len(name) > 35:
1977 name = name[0:35]
1978
1979 # if does not start with alpha character, prefix 'a'
1980 if not name[0].isalpha():
1981 name = "a" + name
1982
1983 name += "-"
1984
1985 def get_random_number():
1986 r = random.randrange(start=1, stop=99999999)
1987 s = str(r)
1988 s = s.rjust(10, "0")
1989 return s
1990
1991 name = name + get_random_number()
1992 return name.lower()
aktas867418c2021-10-19 18:26:13 +03001993
1994 def _split_version(self, kdu_model: str) -> (str, str):
1995 version = None
garciadeblas04393192022-06-08 15:39:24 +02001996 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03001997 parts = kdu_model.split(sep=":")
1998 if len(parts) == 2:
1999 version = str(parts[1])
2000 kdu_model = parts[0]
2001 return kdu_model, version
2002
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002003 def _split_repo(self, kdu_model: str) -> (str, str):
2004 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2005
2006 Args:
2007 kdu_model (str): Associated KDU model
2008
2009 Returns:
2010 (str, str): Tuple with the Chart name in index 0, and the repo name
2011 in index 2; if there was a problem finding them, return None
2012 for both
2013 """
2014
2015 chart_name = None
garciadeblas7faf4ec2022-04-08 22:53:25 +02002016 repo_name = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002017
garciadeblas7faf4ec2022-04-08 22:53:25 +02002018 idx = kdu_model.find("/")
2019 if idx >= 0:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002020 chart_name = kdu_model[idx + 1 :]
garciadeblas7faf4ec2022-04-08 22:53:25 +02002021 repo_name = kdu_model[:idx]
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002022
2023 return chart_name, repo_name
garciadeblas7faf4ec2022-04-08 22:53:25 +02002024
aktas867418c2021-10-19 18:26:13 +03002025 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01002026 """Obtain the Helm repository for an Helm Chart
2027
2028 Args:
2029 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2030 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2031
2032 Returns:
2033 str: the repository URL; if Helm Chart is a local one, the function returns None
2034 """
2035
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002036 _, repo_name = self._split_repo(kdu_model=kdu_model)
2037
aktas867418c2021-10-19 18:26:13 +03002038 repo_url = None
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002039 if repo_name:
aktas867418c2021-10-19 18:26:13 +03002040 # Find repository link
2041 local_repo_list = await self.repo_list(cluster_uuid)
2042 for repo in local_repo_list:
Pedro Escaleira0fcb6fe2022-06-04 19:14:11 +01002043 if repo["name"] == repo_name:
2044 repo_url = repo["url"]
2045 break # it is not necessary to continue the loop if the repo link was found...
2046
aktas867418c2021-10-19 18:26:13 +03002047 return repo_url