blob: 5985066c9e7e3399e591be627ad12b6bd8c4490a [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +010024from typing import Union
lloretgalleg1c83f2e2020-10-22 09:12:35 +000025import random
26import time
27import shlex
28import shutil
29import stat
lloretgalleg1c83f2e2020-10-22 09:12:35 +000030import os
31import yaml
32from uuid import uuid4
33
David Garcia4395cfa2021-05-28 16:21:51 +020034from n2vc.config import EnvironConfig
lloretgalleg1c83f2e2020-10-22 09:12:35 +000035from n2vc.exceptions import K8sException
36from n2vc.k8s_conn import K8sConnector
37
38
39class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
garciadeblas82b591c2021-03-24 09:22:13 +010046
lloretgalleg1c83f2e2020-10-22 09:12:35 +000047 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
David Garcia4395cfa2021-05-28 16:21:51 +020073 self.config = EnvironConfig()
lloretgalleg1c83f2e2020-10-22 09:12:35 +000074 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
David Garcia4395cfa2021-05-28 16:21:51 +020089 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
lloretgalleg83e55892020-12-17 12:42:11 +000092
Pedro Escaleira1f222a92022-06-20 15:40:43 +010093 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
Pedro Escaleirab41de172022-04-02 00:44:08 +010096 def _get_namespace(self, cluster_uuid: str) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +000097 """
Pedro Escaleirab41de172022-04-02 00:44:08 +010098 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000108
109 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200124 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100131 cluster_id = reuse_cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000132 else:
133 cluster_id = str(uuid4())
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
garciadeblas82b591c2021-03-24 09:22:13 +0100135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
Pedro Escaleirab41de172022-04-02 00:44:08 +0100155 return cluster_id, n2vc_installed_sw
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000156
157 async def repo_add(
bravof0ab522f2021-11-23 19:33:18 -0300158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000166 ):
garciadeblas82b591c2021-03-24 09:22:13 +0100167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100169 cluster_uuid, repo_type, name, url
garciadeblas82b591c2021-03-24 09:22:13 +0100170 )
171 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000172
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 # init_env
174 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100175 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000176 )
177
bravof7bd5c6a2021-11-17 11:14:57 -0300178 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100179 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300180
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000181 # helm repo add name url
bravof0ab522f2021-11-23 19:33:18 -0300182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
bravof7bd5c6a2021-11-17 11:14:57 -0300183 paths["kube_config"], self._helm_command, name, url
184 )
bravof0ab522f2021-11-23 19:33:18 -0300185
186 if cert:
187 temp_cert_file = os.path.join(
Pedro Escaleira1188b5d2022-04-22 18:51:00 +0100188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
bravof0ab522f2021-11-23 19:33:18 -0300189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000201 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000205
garciadeblasd4cee8c2022-05-04 10:57:36 +0200206 # helm repo update
garciadeblas069f0a32022-05-04 11:07:41 +0200207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
garciadeblasd4cee8c2022-05-04 10:57:36 +0200209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000215 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100216 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000217
garciadeblas7faf4ec2022-04-08 22:53:25 +0200218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
Pedro Escaleirab41de172022-04-02 00:44:08 +0100250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000251
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000252 # config filename
253 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100254 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000255 )
256
bravof7bd5c6a2021-11-17 11:14:57 -0300257 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100258 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100270 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000286
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000287 # init env, paths
288 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100289 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000290 )
291
bravof7bd5c6a2021-11-17 11:14:57 -0300292 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100293 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
garciadeblas82b591c2021-03-24 09:22:13 +0100298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000300 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000301
302 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100303 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000304
305 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000311 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200312 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000313
David Garciaeb8943a2021-04-12 12:07:37 +0200314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
Pedro Escaleirab41de172022-04-02 00:44:08 +0100322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100325 cluster_uuid, uninstall_sw
garciadeblas82b591c2021-03-24 09:22:13 +0100326 )
327 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000328
329 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100330 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
Pedro Escaleirab41de172022-04-02 00:44:08 +0100359 ).format(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000360 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000364
365 if uninstall_sw:
Pedro Escaleirab41de172022-04-02 00:44:08 +0100366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000367
368 # delete cluster directory
Pedro Escaleirab41de172022-04-02 00:44:08 +0100369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000371 # Remove also local directorio if still exist
Pedro Escaleirab41de172022-04-02 00:44:08 +0100372 direct = self.fs.path + "/" + cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
garciadeblas04393192022-06-08 15:39:24 +0200377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
lloretgalleg095392b2020-11-20 11:28:08 +0000380 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000393 ):
bravof7bd5c6a2021-11-17 11:14:57 -0300394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
aktas867418c2021-10-19 18:26:13 +0300405 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000406
Pedro Escaleira2f0692e2022-06-04 19:14:11 +0100407 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200408 if repo:
limon537d2932022-07-21 13:55:55 +0200409 await self.repo_update(cluster_id, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200410
garciadeblas82b591c2021-03-24 09:22:13 +0100411 command = self._get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100420 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000452 output, rc = await self._local_async_exec(
453 command=command, raise_exception_on_error=False, env=env
454 )
455
456 # remove temporal values yaml file
457 if file_to_delete:
458 os.remove(file_to_delete)
459
460 # write final status
461 await self._store_status(
462 cluster_id=cluster_id,
463 kdu_instance=kdu_instance,
464 namespace=namespace,
465 db_dict=db_dict,
466 operation="install",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000467 )
468
469 if rc != 0:
470 msg = "Error executing command: {}\nOutput: {}".format(command, output)
471 self.log.error(msg)
472 raise K8sException(msg)
473
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000474 async def upgrade(
475 self,
476 cluster_uuid: str,
477 kdu_instance: str,
478 kdu_model: str = None,
479 atomic: bool = True,
480 timeout: float = 300,
481 params: dict = None,
482 db_dict: dict = None,
483 ):
Pedro Escaleirab41de172022-04-02 00:44:08 +0100484 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000485
486 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100487 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000488
489 # look for instance to obtain namespace
490 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
491 if not instance_info:
492 raise K8sException("kdu_instance {} not found".format(kdu_instance))
493
494 # init env, paths
495 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100496 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000497 )
498
bravof7bd5c6a2021-11-17 11:14:57 -0300499 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100500 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300501
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000502 # params to str
503 params_str, file_to_delete = self._params_to_file_option(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100504 cluster_id=cluster_uuid, params=params
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000505 )
506
507 # version
aktas867418c2021-10-19 18:26:13 +0300508 kdu_model, version = self._split_version(kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000509
Pedro Escaleira2f0692e2022-06-04 19:14:11 +0100510 _, repo = self._split_repo(kdu_model)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200511 if repo:
limon537d2932022-07-21 13:55:55 +0200512 await self.repo_update(cluster_uuid, repo)
garciadeblas7faf4ec2022-04-08 22:53:25 +0200513
garciadeblas82b591c2021-03-24 09:22:13 +0100514 command = self._get_upgrade_command(
515 kdu_model,
516 kdu_instance,
517 instance_info["namespace"],
518 params_str,
519 version,
520 atomic,
521 timeout,
bravof7bd5c6a2021-11-17 11:14:57 -0300522 paths["kube_config"],
garciadeblas82b591c2021-03-24 09:22:13 +0100523 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000524
525 self.log.debug("upgrading: {}".format(command))
526
527 if atomic:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000528 # exec helm in a task
529 exec_task = asyncio.ensure_future(
530 coro_or_future=self._local_async_exec(
531 command=command, raise_exception_on_error=False, env=env
532 )
533 )
534 # write status in another task
535 status_task = asyncio.ensure_future(
536 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100537 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000538 kdu_instance=kdu_instance,
539 namespace=instance_info["namespace"],
540 db_dict=db_dict,
541 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000542 )
543 )
544
545 # wait for execution task
546 await asyncio.wait([exec_task])
547
548 # cancel status task
549 status_task.cancel()
550 output, rc = exec_task.result()
551
552 else:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000553 output, rc = await self._local_async_exec(
554 command=command, raise_exception_on_error=False, env=env
555 )
556
557 # remove temporal values yaml file
558 if file_to_delete:
559 os.remove(file_to_delete)
560
561 # write final status
562 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100563 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000564 kdu_instance=kdu_instance,
565 namespace=instance_info["namespace"],
566 db_dict=db_dict,
567 operation="upgrade",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000568 )
569
570 if rc != 0:
571 msg = "Error executing command: {}\nOutput: {}".format(command, output)
572 self.log.error(msg)
573 raise K8sException(msg)
574
575 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100576 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000577
578 # return new revision number
579 instance = await self.get_instance_info(
580 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
581 )
582 if instance:
583 revision = int(instance.get("revision"))
584 self.log.debug("New revision: {}".format(revision))
585 return revision
586 else:
587 return 0
588
aktas2962f3e2021-03-15 11:05:35 +0300589 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100590 self,
591 kdu_instance: str,
592 scale: int,
593 resource_name: str,
594 total_timeout: float = 1800,
aktas867418c2021-10-19 18:26:13 +0300595 cluster_uuid: str = None,
596 kdu_model: str = None,
597 atomic: bool = True,
598 db_dict: dict = None,
garciadeblas82b591c2021-03-24 09:22:13 +0100599 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300600 ):
aktas867418c2021-10-19 18:26:13 +0300601 """Scale a resource in a Helm Chart.
602
603 Args:
604 kdu_instance: KDU instance name
605 scale: Scale to which to set the resource
606 resource_name: Resource name
607 total_timeout: The time, in seconds, to wait
608 cluster_uuid: The UUID of the cluster
609 kdu_model: The chart reference
610 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
611 The --wait flag will be set automatically if --atomic is used
612 db_dict: Dictionary for any additional data
613 kwargs: Additional parameters
614
615 Returns:
616 True if successful, False otherwise
617 """
618
Pedro Escaleirab41de172022-04-02 00:44:08 +0100619 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300620 if resource_name:
621 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100622 resource_name, kdu_model, cluster_uuid
aktas867418c2021-10-19 18:26:13 +0300623 )
624
625 self.log.debug(debug_mgs)
626
627 # look for instance to obtain namespace
628 # get_instance_info function calls the sync command
629 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
630 if not instance_info:
631 raise K8sException("kdu_instance {} not found".format(kdu_instance))
632
633 # init env, paths
634 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100635 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300636 )
637
638 # version
639 kdu_model, version = self._split_version(kdu_model)
640
641 repo_url = await self._find_repo(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300642
643 _, replica_str = await self._get_replica_count_url(
644 kdu_model, repo_url, resource_name
645 )
646
647 command = self._get_upgrade_scale_command(
648 kdu_model,
649 kdu_instance,
650 instance_info["namespace"],
651 scale,
652 version,
653 atomic,
654 replica_str,
655 total_timeout,
656 resource_name,
657 paths["kube_config"],
658 )
659
660 self.log.debug("scaling: {}".format(command))
661
662 if atomic:
663 # exec helm in a task
664 exec_task = asyncio.ensure_future(
665 coro_or_future=self._local_async_exec(
666 command=command, raise_exception_on_error=False, env=env
667 )
668 )
669 # write status in another task
670 status_task = asyncio.ensure_future(
671 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100672 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300673 kdu_instance=kdu_instance,
674 namespace=instance_info["namespace"],
675 db_dict=db_dict,
676 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300677 )
678 )
679
680 # wait for execution task
681 await asyncio.wait([exec_task])
682
683 # cancel status task
684 status_task.cancel()
685 output, rc = exec_task.result()
686
687 else:
688 output, rc = await self._local_async_exec(
689 command=command, raise_exception_on_error=False, env=env
690 )
691
692 # write final status
693 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100694 cluster_id=cluster_uuid,
aktas867418c2021-10-19 18:26:13 +0300695 kdu_instance=kdu_instance,
696 namespace=instance_info["namespace"],
697 db_dict=db_dict,
698 operation="scale",
aktas867418c2021-10-19 18:26:13 +0300699 )
700
701 if rc != 0:
702 msg = "Error executing command: {}\nOutput: {}".format(command, output)
703 self.log.error(msg)
704 raise K8sException(msg)
705
706 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100707 self.fs.reverse_sync(from_path=cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300708
709 return True
aktas2962f3e2021-03-15 11:05:35 +0300710
711 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100712 self,
713 resource_name: str,
714 kdu_instance: str,
aktas867418c2021-10-19 18:26:13 +0300715 cluster_uuid: str,
716 kdu_model: str,
garciadeblas82b591c2021-03-24 09:22:13 +0100717 **kwargs,
aktas867418c2021-10-19 18:26:13 +0300718 ) -> int:
719 """Get a resource scale count.
720
721 Args:
722 cluster_uuid: The UUID of the cluster
723 resource_name: Resource name
724 kdu_instance: KDU instance name
Pedro Escaleira547f8232022-06-03 19:48:46 +0100725 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +0300726 kwargs: Additional parameters
727
728 Returns:
729 Resource instance count
730 """
731
aktas867418c2021-10-19 18:26:13 +0300732 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100733 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
aktas867418c2021-10-19 18:26:13 +0300734 )
735
736 # look for instance to obtain namespace
737 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
738 if not instance_info:
739 raise K8sException("kdu_instance {} not found".format(kdu_instance))
740
741 # init env, paths
Pedro Escaleira5d542b52022-06-04 22:21:57 +0100742 paths, _ = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100743 cluster_name=cluster_uuid, create_if_not_exist=True
aktas867418c2021-10-19 18:26:13 +0300744 )
745
746 replicas = await self._get_replica_count_instance(
Pedro Escaleiraaa5deb72022-06-05 01:29:57 +0100747 kdu_instance=kdu_instance,
748 namespace=instance_info["namespace"],
749 kubeconfig=paths["kube_config"],
750 resource_name=resource_name,
aktas867418c2021-10-19 18:26:13 +0300751 )
752
Pedro Escaleira5d542b52022-06-04 22:21:57 +0100753 self.log.debug(
754 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
755 )
756
aktas867418c2021-10-19 18:26:13 +0300757 # Get default value if scale count is not found from provided values
Pedro Escaleira5d542b52022-06-04 22:21:57 +0100758 # Important note: this piece of code shall only be executed in the first scaling operation,
759 # since it is expected that the _get_replica_count_instance is able to obtain the number of
760 # replicas when a scale operation was already conducted previously for this KDU/resource!
761 if replicas is None:
Pedro Escaleira547f8232022-06-03 19:48:46 +0100762 repo_url = await self._find_repo(
763 kdu_model=kdu_model, cluster_uuid=cluster_uuid
764 )
aktas867418c2021-10-19 18:26:13 +0300765 replicas, _ = await self._get_replica_count_url(
Pedro Escaleira547f8232022-06-03 19:48:46 +0100766 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
aktas867418c2021-10-19 18:26:13 +0300767 )
768
Pedro Escaleira5d542b52022-06-04 22:21:57 +0100769 self.log.debug(
770 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
771 f"{resource_name} obtained: {replicas}"
772 )
773
774 if replicas is None:
775 msg = "Replica count not found. Cannot be scaled"
776 self.log.error(msg)
777 raise K8sException(msg)
aktas867418c2021-10-19 18:26:13 +0300778
779 return int(replicas)
aktas2962f3e2021-03-15 11:05:35 +0300780
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000781 async def rollback(
782 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
783 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000784 self.log.debug(
785 "rollback kdu_instance {} to revision {} from cluster {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100786 kdu_instance, revision, cluster_uuid
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000787 )
788 )
789
790 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100791 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000792
793 # look for instance to obtain namespace
794 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
795 if not instance_info:
796 raise K8sException("kdu_instance {} not found".format(kdu_instance))
797
798 # init env, paths
799 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100800 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000801 )
802
bravof7bd5c6a2021-11-17 11:14:57 -0300803 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100804 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300805
garciadeblas82b591c2021-03-24 09:22:13 +0100806 command = self._get_rollback_command(
bravof7bd5c6a2021-11-17 11:14:57 -0300807 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
garciadeblas82b591c2021-03-24 09:22:13 +0100808 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000809
810 self.log.debug("rolling_back: {}".format(command))
811
812 # exec helm in a task
813 exec_task = asyncio.ensure_future(
814 coro_or_future=self._local_async_exec(
815 command=command, raise_exception_on_error=False, env=env
816 )
817 )
818 # write status in another task
819 status_task = asyncio.ensure_future(
820 coro_or_future=self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100821 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000822 kdu_instance=kdu_instance,
823 namespace=instance_info["namespace"],
824 db_dict=db_dict,
825 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000826 )
827 )
828
829 # wait for execution task
830 await asyncio.wait([exec_task])
831
832 # cancel status task
833 status_task.cancel()
834
835 output, rc = exec_task.result()
836
837 # write final status
838 await self._store_status(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100839 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000840 kdu_instance=kdu_instance,
841 namespace=instance_info["namespace"],
842 db_dict=db_dict,
843 operation="rollback",
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000844 )
845
846 if rc != 0:
847 msg = "Error executing command: {}\nOutput: {}".format(command, output)
848 self.log.error(msg)
849 raise K8sException(msg)
850
851 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100852 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000853
854 # return new revision number
855 instance = await self.get_instance_info(
856 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
857 )
858 if instance:
859 revision = int(instance.get("revision"))
860 self.log.debug("New revision: {}".format(revision))
861 return revision
862 else:
863 return 0
864
David Garciaeb8943a2021-04-12 12:07:37 +0200865 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000866 """
867 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
868 (this call should happen after all _terminate-config-primitive_ of the VNF
869 are invoked).
870
871 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
872 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200873 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000874 :return: True if successful
875 """
876
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000877 self.log.debug(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100878 "uninstall kdu_instance {} from cluster {}".format(
879 kdu_instance, cluster_uuid
880 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000881 )
882
883 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100884 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000885
886 # look for instance to obtain namespace
887 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
888 if not instance_info:
David Garcia7add1872021-08-18 14:52:52 +0200889 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
890 return True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000891 # init env, paths
892 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +0100893 cluster_name=cluster_uuid, create_if_not_exist=True
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000894 )
895
bravof7bd5c6a2021-11-17 11:14:57 -0300896 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100897 self.fs.sync(from_path=cluster_uuid)
bravof7bd5c6a2021-11-17 11:14:57 -0300898
899 command = self._get_uninstall_command(
900 kdu_instance, instance_info["namespace"], paths["kube_config"]
901 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000902 output, _rc = await self._local_async_exec(
903 command=command, raise_exception_on_error=True, env=env
904 )
905
906 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100907 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000908
909 return self._output_to_table(output)
910
911 async def instances_list(self, cluster_uuid: str) -> list:
912 """
913 returns a list of deployed releases in a cluster
914
915 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
916 :return:
917 """
918
Pedro Escaleirab41de172022-04-02 00:44:08 +0100919 self.log.debug("list releases for cluster {}".format(cluster_uuid))
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000920
921 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +0100922 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000923
924 # execute internal command
Pedro Escaleirab41de172022-04-02 00:44:08 +0100925 result = await self._instances_list(cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000926
927 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +0100928 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000929
930 return result
931
932 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
933 instances = await self.instances_list(cluster_uuid=cluster_uuid)
934 for instance in instances:
935 if instance.get("name") == kdu_instance:
936 return instance
937 self.log.debug("Instance {} not found".format(kdu_instance))
938 return None
939
aticig8070c3c2022-04-18 00:31:42 +0300940 async def upgrade_charm(
941 self,
942 ee_id: str = None,
943 path: str = None,
944 charm_id: str = None,
945 charm_type: str = None,
946 timeout: float = None,
947 ) -> str:
948 """This method upgrade charms in VNFs
949
950 Args:
951 ee_id: Execution environment id
952 path: Local path to the charm
953 charm_id: charm-id
954 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
955 timeout: (Float) Timeout for the ns update operation
956
957 Returns:
958 The output of the update operation if status equals to "completed"
959 """
960 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
961
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000962 async def exec_primitive(
963 self,
964 cluster_uuid: str = None,
965 kdu_instance: str = None,
966 primitive_name: str = None,
967 timeout: float = 300,
968 params: dict = None,
969 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200970 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000971 ) -> str:
972 """Exec primitive (Juju action)
973
974 :param cluster_uuid: The UUID of the cluster or namespace:cluster
975 :param kdu_instance: The unique name of the KDU instance
976 :param primitive_name: Name of action that will be executed
977 :param timeout: Timeout for action execution
978 :param params: Dictionary of all the parameters needed for the action
979 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200980 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000981
982 :return: Returns the output of the action
983 """
984 raise K8sException(
985 "KDUs deployed with Helm don't support actions "
986 "different from rollback, upgrade and status"
987 )
988
garciadeblas82b591c2021-03-24 09:22:13 +0100989 async def get_services(
990 self, cluster_uuid: str, kdu_instance: str, namespace: str
991 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000992 """
993 Returns a list of services defined for the specified kdu instance.
994
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param namespace: K8s namespace used by the KDU instance
998 :return: If successful, it will return a list of services, Each service
999 can have the following data:
1000 - `name` of the service
1001 - `type` type of service in the k8 cluster
1002 - `ports` List of ports offered by the service, for each port includes at least
1003 name, port, protocol
1004 - `cluster_ip` Internal ip to be used inside k8s cluster
1005 - `external_ip` List of external ips (in case they are available)
1006 """
1007
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001008 self.log.debug(
1009 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1010 cluster_uuid, kdu_instance
1011 )
1012 )
1013
bravof7bd5c6a2021-11-17 11:14:57 -03001014 # init env, paths
1015 paths, env = self._init_paths_env(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001016 cluster_name=cluster_uuid, create_if_not_exist=True
bravof7bd5c6a2021-11-17 11:14:57 -03001017 )
1018
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001019 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001020 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001021
1022 # get list of services names for kdu
bravof7bd5c6a2021-11-17 11:14:57 -03001023 service_names = await self._get_services(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001024 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
bravof7bd5c6a2021-11-17 11:14:57 -03001025 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001026
1027 service_list = []
1028 for service in service_names:
Pedro Escaleirab41de172022-04-02 00:44:08 +01001029 service = await self._get_service(cluster_uuid, service, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001030 service_list.append(service)
1031
1032 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001033 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001034
1035 return service_list
1036
garciadeblas82b591c2021-03-24 09:22:13 +01001037 async def get_service(
1038 self, cluster_uuid: str, service_name: str, namespace: str
1039 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001040 self.log.debug(
1041 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +01001042 service_name, namespace, cluster_uuid
1043 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001044 )
1045
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001046 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001047 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001048
Pedro Escaleirab41de172022-04-02 00:44:08 +01001049 service = await self._get_service(cluster_uuid, service_name, namespace)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001050
1051 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001052 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001053
1054 return service
1055
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001056 async def status_kdu(
1057 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1058 ) -> Union[str, dict]:
David Garciaeb8943a2021-04-12 12:07:37 +02001059 """
1060 This call would retrieve tha current state of a given KDU instance. It would be
1061 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1062 values_ of the configuration parameters applied to a given instance. This call
1063 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001064
David Garciaeb8943a2021-04-12 12:07:37 +02001065 :param cluster_uuid: UUID of a K8s cluster known by OSM
1066 :param kdu_instance: unique name for the KDU instance
1067 :param kwargs: Additional parameters (None yet)
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001068 :param yaml_format: if the return shall be returned as an YAML string or as a
1069 dictionary
David Garciaeb8943a2021-04-12 12:07:37 +02001070 :return: If successful, it will return the following vector of arguments:
1071 - K8s `namespace` in the cluster where the KDU lives
1072 - `state` of the KDU instance. It can be:
1073 - UNKNOWN
1074 - DEPLOYED
1075 - DELETED
1076 - SUPERSEDED
1077 - FAILED or
1078 - DELETING
1079 - List of `resources` (objects) that this release consists of, sorted by kind,
1080 and the status of those resources
1081 - Last `deployment_time`.
1082
1083 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001084 self.log.debug(
1085 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1086 cluster_uuid, kdu_instance
1087 )
1088 )
1089
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001090 # sync local dir
Pedro Escaleirab41de172022-04-02 00:44:08 +01001091 self.fs.sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001092
1093 # get instance: needed to obtain namespace
Pedro Escaleirab41de172022-04-02 00:44:08 +01001094 instances = await self._instances_list(cluster_id=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001095 for instance in instances:
1096 if instance.get("name") == kdu_instance:
1097 break
1098 else:
1099 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +01001100 raise K8sException(
1101 "Instance name: {} not found in cluster: {}".format(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001102 kdu_instance, cluster_uuid
garciadeblas82b591c2021-03-24 09:22:13 +01001103 )
1104 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001105
1106 status = await self._status_kdu(
Pedro Escaleirab41de172022-04-02 00:44:08 +01001107 cluster_id=cluster_uuid,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001108 kdu_instance=kdu_instance,
1109 namespace=instance["namespace"],
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001110 yaml_format=yaml_format,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001111 show_error_log=True,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001112 )
1113
1114 # sync fs
Pedro Escaleirab41de172022-04-02 00:44:08 +01001115 self.fs.reverse_sync(from_path=cluster_uuid)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001116
1117 return status
1118
aktas867418c2021-10-19 18:26:13 +03001119 async def get_values_kdu(
1120 self, kdu_instance: str, namespace: str, kubeconfig: str
1121 ) -> str:
aktas867418c2021-10-19 18:26:13 +03001122 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1123
1124 return await self._exec_get_command(
1125 get_command="values",
1126 kdu_instance=kdu_instance,
1127 namespace=namespace,
1128 kubeconfig=kubeconfig,
1129 )
1130
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001131 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001132 """Method to obtain the Helm Chart package's values
1133
1134 Args:
1135 kdu_model: The name or path of an Helm Chart
1136 repo_url: Helm Chart repository url
1137
1138 Returns:
1139 str: the values of the Helm Chart package
1140 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001141
1142 self.log.debug(
1143 "inspect kdu_model values {} from (optional) repo: {}".format(
1144 kdu_model, repo_url
1145 )
1146 )
1147
aktas867418c2021-10-19 18:26:13 +03001148 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001149 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1150 )
1151
1152 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001153 self.log.debug(
1154 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1155 )
1156
aktas867418c2021-10-19 18:26:13 +03001157 return await self._exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001158 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1159 )
1160
1161 async def synchronize_repos(self, cluster_uuid: str):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001162 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1163 try:
1164 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1165 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1166
1167 local_repo_list = await self.repo_list(cluster_uuid)
1168 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1169
1170 deleted_repo_list = []
1171 added_repo_dict = {}
1172
1173 # iterate over the list of repos in the database that should be
1174 # added if not present
1175 for repo_name, db_repo in db_repo_dict.items():
1176 try:
1177 # check if it is already present
1178 curr_repo_url = local_repo_dict.get(db_repo["name"])
1179 repo_id = db_repo.get("_id")
1180 if curr_repo_url != db_repo["url"]:
1181 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +01001182 self.log.debug(
1183 "repo {} url changed, delete and and again".format(
1184 db_repo["url"]
1185 )
1186 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001187 await self.repo_remove(cluster_uuid, db_repo["name"])
1188 deleted_repo_list.append(repo_id)
1189
1190 # add repo
1191 self.log.debug("add repo {}".format(db_repo["name"]))
bravof0ab522f2021-11-23 19:33:18 -03001192 if "ca_cert" in db_repo:
1193 await self.repo_add(
1194 cluster_uuid,
1195 db_repo["name"],
1196 db_repo["url"],
1197 cert=db_repo["ca_cert"],
1198 )
1199 else:
1200 await self.repo_add(
1201 cluster_uuid,
1202 db_repo["name"],
1203 db_repo["url"],
1204 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001205 added_repo_dict[repo_id] = db_repo["name"]
1206 except Exception as e:
1207 raise K8sException(
1208 "Error adding repo id: {}, err_msg: {} ".format(
1209 repo_id, repr(e)
1210 )
1211 )
1212
1213 # Delete repos that are present but not in nbi_list
1214 for repo_name in local_repo_dict:
1215 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1216 self.log.debug("delete repo {}".format(repo_name))
1217 try:
1218 await self.repo_remove(cluster_uuid, repo_name)
1219 deleted_repo_list.append(repo_name)
1220 except Exception as e:
1221 self.warning(
1222 "Error deleting repo, name: {}, err_msg: {}".format(
1223 repo_name, str(e)
1224 )
1225 )
1226
1227 return deleted_repo_list, added_repo_dict
1228
1229 except K8sException:
1230 raise
1231 except Exception as e:
1232 # Do not raise errors synchronizing repos
1233 self.log.error("Error synchronizing repos: {}".format(e))
1234 raise Exception("Error synchronizing repos: {}".format(e))
1235
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001236 def _get_db_repos_dict(self, repo_ids: list):
1237 db_repos_dict = {}
1238 for repo_id in repo_ids:
1239 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1240 db_repos_dict[db_repo["name"]] = db_repo
1241 return db_repos_dict
1242
1243 """
1244 ####################################################################################
1245 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1246 ####################################################################################
1247 """
1248
1249 @abc.abstractmethod
1250 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1251 """
1252 Creates and returns base cluster and kube dirs and returns them.
1253 Also created helm3 dirs according to new directory specification, paths are
1254 not returned but assigned to helm environment variables
1255
1256 :param cluster_name: cluster_name
1257 :return: Dictionary with config_paths and dictionary with helm environment variables
1258 """
1259
1260 @abc.abstractmethod
1261 async def _cluster_init(self, cluster_id, namespace, paths, env):
1262 """
1263 Implements the helm version dependent cluster initialization
1264 """
1265
1266 @abc.abstractmethod
1267 async def _instances_list(self, cluster_id):
1268 """
1269 Implements the helm version dependent helm instances list
1270 """
1271
1272 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001273 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001274 """
1275 Implements the helm version dependent method to obtain services from a helm instance
1276 """
1277
1278 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001279 async def _status_kdu(
1280 self,
1281 cluster_id: str,
1282 kdu_instance: str,
1283 namespace: str = None,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001284 yaml_format: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001285 show_error_log: bool = False,
Pedro Escaleiraa8980cc2022-04-05 17:32:13 +01001286 ) -> Union[str, dict]:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001287 """
1288 Implements the helm version dependent method to obtain status of a helm instance
1289 """
1290
1291 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001292 def _get_install_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001293 self,
1294 kdu_model,
1295 kdu_instance,
1296 namespace,
1297 params_str,
1298 version,
1299 atomic,
1300 timeout,
1301 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001302 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001303 """
1304 Obtain command to be executed to delete the indicated instance
1305 """
1306
1307 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001308 def _get_upgrade_scale_command(
1309 self,
1310 kdu_model,
1311 kdu_instance,
1312 namespace,
1313 count,
1314 version,
1315 atomic,
1316 replicas,
1317 timeout,
1318 resource_name,
1319 kubeconfig,
1320 ) -> str:
Pedro Escaleira44bd0682022-07-07 22:18:35 +01001321 """Generates the command to scale a Helm Chart release
1322
1323 Args:
1324 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1325 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1326 namespace (str): Namespace where this KDU instance is deployed
1327 scale (int): Scale count
1328 version (str): Constraint with specific version of the Chart to use
1329 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1330 The --wait flag will be set automatically if --atomic is used
1331 replica_str (str): The key under resource_name key where the scale count is stored
1332 timeout (float): The time, in seconds, to wait
1333 resource_name (str): The KDU's resource to scale
1334 kubeconfig (str): Kubeconfig file path
1335
1336 Returns:
1337 str: command to scale a Helm Chart release
1338 """
aktas867418c2021-10-19 18:26:13 +03001339
1340 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001341 def _get_upgrade_command(
bravof7bd5c6a2021-11-17 11:14:57 -03001342 self,
1343 kdu_model,
1344 kdu_instance,
1345 namespace,
1346 params_str,
1347 version,
1348 atomic,
1349 timeout,
1350 kubeconfig,
garciadeblas82b591c2021-03-24 09:22:13 +01001351 ) -> str:
Pedro Escaleira44bd0682022-07-07 22:18:35 +01001352 """Generates the command to upgrade a Helm Chart release
1353
1354 Args:
1355 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1356 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1357 namespace (str): Namespace where this KDU instance is deployed
1358 params_str (str): Params used to upgrade the Helm Chart release
1359 version (str): Constraint with specific version of the Chart to use
1360 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1361 The --wait flag will be set automatically if --atomic is used
1362 timeout (float): The time, in seconds, to wait
1363 kubeconfig (str): Kubeconfig file path
1364
1365 Returns:
1366 str: command to upgrade a Helm Chart release
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001367 """
1368
1369 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001370 def _get_rollback_command(
1371 self, kdu_instance, namespace, revision, kubeconfig
1372 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001373 """
1374 Obtain command to be executed to rollback the indicated instance
1375 """
1376
1377 @abc.abstractmethod
bravof7bd5c6a2021-11-17 11:14:57 -03001378 def _get_uninstall_command(
1379 self, kdu_instance: str, namespace: str, kubeconfig: str
1380 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001381 """
1382 Obtain command to be executed to delete the indicated instance
1383 """
1384
1385 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001386 def _get_inspect_command(
1387 self, show_command: str, kdu_model: str, repo_str: str, version: str
1388 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001389 """Generates the command to obtain the information about an Helm Chart package
1390 (´helm show ...´ command)
1391
1392 Args:
1393 show_command: the second part of the command (`helm show <show_command>`)
1394 kdu_model: The name or path of an Helm Chart
1395 repo_url: Helm Chart repository url
1396 version: constraint with specific version of the Chart to use
1397
1398 Returns:
1399 str: the generated Helm Chart command
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001400 """
1401
1402 @abc.abstractmethod
aktas867418c2021-10-19 18:26:13 +03001403 def _get_get_command(
1404 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1405 ):
1406 """Obtain command to be executed to get information about the kdu instance."""
1407
1408 @abc.abstractmethod
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001409 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1410 """
1411 Method call to uninstall cluster software for helm. This method is dependent
1412 of helm version
1413 For Helm v2 it will be called when Tiller must be uninstalled
1414 For Helm v3 it does nothing and does not need to be callled
1415 """
1416
lloretgalleg095392b2020-11-20 11:28:08 +00001417 @abc.abstractmethod
1418 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1419 """
1420 Obtains the cluster repos identifiers
1421 """
1422
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001423 """
1424 ####################################################################################
1425 ################################### P R I V A T E ##################################
1426 ####################################################################################
1427 """
1428
1429 @staticmethod
1430 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1431 if os.path.exists(filename):
1432 return True
1433 else:
1434 msg = "File {} does not exist".format(filename)
1435 if exception_if_not_exists:
1436 raise K8sException(msg)
1437
1438 @staticmethod
1439 def _remove_multiple_spaces(strobj):
1440 strobj = strobj.strip()
1441 while " " in strobj:
1442 strobj = strobj.replace(" ", " ")
1443 return strobj
1444
1445 @staticmethod
1446 def _output_to_lines(output: str) -> list:
1447 output_lines = list()
1448 lines = output.splitlines(keepends=False)
1449 for line in lines:
1450 line = line.strip()
1451 if len(line) > 0:
1452 output_lines.append(line)
1453 return output_lines
1454
1455 @staticmethod
1456 def _output_to_table(output: str) -> list:
1457 output_table = list()
1458 lines = output.splitlines(keepends=False)
1459 for line in lines:
1460 line = line.replace("\t", " ")
1461 line_list = list()
1462 output_table.append(line_list)
1463 cells = line.split(sep=" ")
1464 for cell in cells:
1465 cell = cell.strip()
1466 if len(cell) > 0:
1467 line_list.append(cell)
1468 return output_table
1469
1470 @staticmethod
1471 def _parse_services(output: str) -> list:
1472 lines = output.splitlines(keepends=False)
1473 services = []
1474 for line in lines:
1475 line = line.replace("\t", " ")
1476 cells = line.split(sep=" ")
1477 if len(cells) > 0 and cells[0].startswith("service/"):
1478 elems = cells[0].split(sep="/")
1479 if len(elems) > 1:
1480 services.append(elems[1])
1481 return services
1482
1483 @staticmethod
1484 def _get_deep(dictionary: dict, members: tuple):
1485 target = dictionary
1486 value = None
1487 try:
1488 for m in members:
1489 value = target.get(m)
1490 if not value:
1491 return None
1492 else:
1493 target = value
1494 except Exception:
1495 pass
1496 return value
1497
1498 # find key:value in several lines
1499 @staticmethod
1500 def _find_in_lines(p_lines: list, p_key: str) -> str:
1501 for line in p_lines:
1502 try:
1503 if line.startswith(p_key + ":"):
1504 parts = line.split(":")
1505 the_value = parts[1].strip()
1506 return the_value
1507 except Exception:
1508 # ignore it
1509 pass
1510 return None
1511
1512 @staticmethod
1513 def _lower_keys_list(input_list: list):
1514 """
1515 Transform the keys in a list of dictionaries to lower case and returns a new list
1516 of dictionaries
1517 """
1518 new_list = []
David Garcia4395cfa2021-05-28 16:21:51 +02001519 if input_list:
1520 for dictionary in input_list:
1521 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1522 new_list.append(new_dict)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001523 return new_list
1524
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001525 async def _local_async_exec(
1526 self,
1527 command: str,
1528 raise_exception_on_error: bool = False,
1529 show_error_log: bool = True,
1530 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001531 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001532 ) -> (str, int):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001533 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001534 self.log.debug(
1535 "Executing async local command: {}, env: {}".format(command, env)
1536 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001537
1538 # split command
1539 command = shlex.split(command)
1540
1541 environ = os.environ.copy()
1542 if env:
1543 environ.update(env)
1544
1545 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001546 async with self.cmd_lock:
1547 process = await asyncio.create_subprocess_exec(
1548 *command,
1549 stdout=asyncio.subprocess.PIPE,
1550 stderr=asyncio.subprocess.PIPE,
1551 env=environ,
1552 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001553
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001554 # wait for command terminate
1555 stdout, stderr = await process.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001556
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001557 return_code = process.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001558
1559 output = ""
1560 if stdout:
1561 output = stdout.decode("utf-8").strip()
1562 # output = stdout.decode()
1563 if stderr:
1564 output = stderr.decode("utf-8").strip()
1565 # output = stderr.decode()
1566
1567 if return_code != 0 and show_error_log:
1568 self.log.debug(
1569 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1570 )
1571 else:
1572 self.log.debug("Return code: {}".format(return_code))
1573
1574 if raise_exception_on_error and return_code != 0:
1575 raise K8sException(output)
1576
1577 if encode_utf8:
1578 output = output.encode("utf-8").strip()
1579 output = str(output).replace("\\n", "\n")
1580
1581 return output, return_code
1582
1583 except asyncio.CancelledError:
1584 raise
1585 except K8sException:
1586 raise
1587 except Exception as e:
1588 msg = "Exception executing command: {} -> {}".format(command, e)
1589 self.log.error(msg)
1590 if raise_exception_on_error:
1591 raise K8sException(e) from e
1592 else:
1593 return "", -1
1594
garciadeblas82b591c2021-03-24 09:22:13 +01001595 async def _local_async_exec_pipe(
1596 self,
1597 command1: str,
1598 command2: str,
1599 raise_exception_on_error: bool = True,
1600 show_error_log: bool = True,
1601 encode_utf8: bool = False,
1602 env: dict = None,
1603 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001604 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1605 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1606 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001607 self.log.debug(
1608 "Executing async local command: {}, env: {}".format(command, env)
1609 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001610
1611 # split command
1612 command1 = shlex.split(command1)
1613 command2 = shlex.split(command2)
1614
1615 environ = os.environ.copy()
1616 if env:
1617 environ.update(env)
1618
1619 try:
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001620 async with self.cmd_lock:
1621 read, write = os.pipe()
1622 await asyncio.create_subprocess_exec(
1623 *command1, stdout=write, env=environ
1624 )
1625 os.close(write)
1626 process_2 = await asyncio.create_subprocess_exec(
1627 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1628 )
1629 os.close(read)
1630 stdout, stderr = await process_2.communicate()
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001631
Pedro Escaleira1f222a92022-06-20 15:40:43 +01001632 return_code = process_2.returncode
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001633
1634 output = ""
1635 if stdout:
1636 output = stdout.decode("utf-8").strip()
1637 # output = stdout.decode()
1638 if stderr:
1639 output = stderr.decode("utf-8").strip()
1640 # output = stderr.decode()
1641
1642 if return_code != 0 and show_error_log:
1643 self.log.debug(
1644 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1645 )
1646 else:
1647 self.log.debug("Return code: {}".format(return_code))
1648
1649 if raise_exception_on_error and return_code != 0:
1650 raise K8sException(output)
1651
1652 if encode_utf8:
1653 output = output.encode("utf-8").strip()
1654 output = str(output).replace("\\n", "\n")
1655
1656 return output, return_code
1657 except asyncio.CancelledError:
1658 raise
1659 except K8sException:
1660 raise
1661 except Exception as e:
1662 msg = "Exception executing command: {} -> {}".format(command, e)
1663 self.log.error(msg)
1664 if raise_exception_on_error:
1665 raise K8sException(e) from e
1666 else:
1667 return "", -1
1668
1669 async def _get_service(self, cluster_id, service_name, namespace):
1670 """
1671 Obtains the data of the specified service in the k8cluster.
1672
1673 :param cluster_id: id of a K8s cluster known by OSM
1674 :param service_name: name of the K8s service in the specified namespace
1675 :param namespace: K8s namespace used by the KDU instance
1676 :return: If successful, it will return a service with the following data:
1677 - `name` of the service
1678 - `type` type of service in the k8 cluster
1679 - `ports` List of ports offered by the service, for each port includes at least
1680 name, port, protocol
1681 - `cluster_ip` Internal ip to be used inside k8s cluster
1682 - `external_ip` List of external ips (in case they are available)
1683 """
1684
1685 # init config, env
1686 paths, env = self._init_paths_env(
1687 cluster_name=cluster_id, create_if_not_exist=True
1688 )
1689
1690 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1691 self.kubectl_command, paths["kube_config"], namespace, service_name
1692 )
1693
1694 output, _rc = await self._local_async_exec(
1695 command=command, raise_exception_on_error=True, env=env
1696 )
1697
1698 data = yaml.load(output, Loader=yaml.SafeLoader)
1699
1700 service = {
1701 "name": service_name,
1702 "type": self._get_deep(data, ("spec", "type")),
1703 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001704 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001705 }
1706 if service["type"] == "LoadBalancer":
1707 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1708 ip_list = [elem["ip"] for elem in ip_map_list]
1709 service["external_ip"] = ip_list
1710
1711 return service
1712
aktas867418c2021-10-19 18:26:13 +03001713 async def _exec_get_command(
1714 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1715 ):
1716 """Obtains information about the kdu instance."""
1717
1718 full_command = self._get_get_command(
1719 get_command, kdu_instance, namespace, kubeconfig
1720 )
1721
1722 output, _rc = await self._local_async_exec(command=full_command)
1723
1724 return output
1725
1726 async def _exec_inspect_command(
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001727 self, inspect_command: str, kdu_model: str, repo_url: str = None
1728 ):
Pedro Escaleira547f8232022-06-03 19:48:46 +01001729 """Obtains information about an Helm Chart package (´helm show´ command)
1730
1731 Args:
1732 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1733 kdu_model: The name or path of an Helm Chart
1734 repo_url: Helm Chart repository url
1735
1736 Returns:
1737 str: the requested info about the Helm Chart package
1738 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001739
1740 repo_str = ""
1741 if repo_url:
1742 repo_str = " --repo {}".format(repo_url)
1743
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01001744 # Obtain the Chart's name and store it in the var kdu_model
1745 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001746
aktas867418c2021-10-19 18:26:13 +03001747 kdu_model, version = self._split_version(kdu_model)
1748 if version:
1749 version_str = "--version {}".format(version)
1750 else:
1751 version_str = ""
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001752
garciadeblas82b591c2021-03-24 09:22:13 +01001753 full_command = self._get_inspect_command(
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01001754 show_command=inspect_command,
1755 kdu_model=kdu_model,
1756 repo_str=repo_str,
1757 version=version_str,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001758 )
1759
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01001760 output, _ = await self._local_async_exec(command=full_command)
aktas867418c2021-10-19 18:26:13 +03001761
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001762 return output
1763
aktas867418c2021-10-19 18:26:13 +03001764 async def _get_replica_count_url(
1765 self,
1766 kdu_model: str,
Pedro Escaleira547f8232022-06-03 19:48:46 +01001767 repo_url: str = None,
aktas867418c2021-10-19 18:26:13 +03001768 resource_name: str = None,
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001769 ) -> (int, str):
aktas867418c2021-10-19 18:26:13 +03001770 """Get the replica count value in the Helm Chart Values.
1771
1772 Args:
Pedro Escaleira547f8232022-06-03 19:48:46 +01001773 kdu_model: The name or path of an Helm Chart
aktas867418c2021-10-19 18:26:13 +03001774 repo_url: Helm Chart repository url
1775 resource_name: Resource name
1776
1777 Returns:
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001778 A tuple with:
1779 - The number of replicas of the specific instance; if not found, returns None; and
1780 - The string corresponding to the replica count key in the Helm values
aktas867418c2021-10-19 18:26:13 +03001781 """
1782
1783 kdu_values = yaml.load(
Pedro Escaleira547f8232022-06-03 19:48:46 +01001784 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1785 Loader=yaml.SafeLoader,
aktas867418c2021-10-19 18:26:13 +03001786 )
1787
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001788 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1789
aktas867418c2021-10-19 18:26:13 +03001790 if not kdu_values:
1791 raise K8sException(
1792 "kdu_values not found for kdu_model {}".format(kdu_model)
1793 )
1794
1795 if resource_name:
1796 kdu_values = kdu_values.get(resource_name, None)
1797
1798 if not kdu_values:
1799 msg = "resource {} not found in the values in model {}".format(
1800 resource_name, kdu_model
1801 )
1802 self.log.error(msg)
1803 raise K8sException(msg)
1804
1805 duplicate_check = False
1806
1807 replica_str = ""
1808 replicas = None
1809
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001810 if kdu_values.get("replicaCount") is not None:
aktas867418c2021-10-19 18:26:13 +03001811 replicas = kdu_values["replicaCount"]
1812 replica_str = "replicaCount"
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001813 elif kdu_values.get("replicas") is not None:
aktas867418c2021-10-19 18:26:13 +03001814 duplicate_check = True
1815 replicas = kdu_values["replicas"]
1816 replica_str = "replicas"
1817 else:
1818 if resource_name:
1819 msg = (
1820 "replicaCount or replicas not found in the resource"
1821 "{} values in model {}. Cannot be scaled".format(
1822 resource_name, kdu_model
1823 )
1824 )
1825 else:
1826 msg = (
1827 "replicaCount or replicas not found in the values"
1828 "in model {}. Cannot be scaled".format(kdu_model)
1829 )
1830 self.log.error(msg)
1831 raise K8sException(msg)
1832
1833 # Control if replicas and replicaCount exists at the same time
1834 msg = "replicaCount and replicas are exists at the same time"
1835 if duplicate_check:
1836 if "replicaCount" in kdu_values:
1837 self.log.error(msg)
1838 raise K8sException(msg)
1839 else:
1840 if "replicas" in kdu_values:
1841 self.log.error(msg)
1842 raise K8sException(msg)
1843
1844 return replicas, replica_str
1845
1846 async def _get_replica_count_instance(
1847 self,
1848 kdu_instance: str,
1849 namespace: str,
1850 kubeconfig: str,
1851 resource_name: str = None,
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001852 ) -> int:
aktas867418c2021-10-19 18:26:13 +03001853 """Get the replica count value in the instance.
1854
1855 Args:
1856 kdu_instance: The name of the KDU instance
1857 namespace: KDU instance namespace
1858 kubeconfig:
1859 resource_name: Resource name
1860
1861 Returns:
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001862 The number of replicas of the specific instance; if not found, returns None
aktas867418c2021-10-19 18:26:13 +03001863 """
1864
1865 kdu_values = yaml.load(
1866 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1867 Loader=yaml.SafeLoader,
1868 )
1869
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001870 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1871
aktas867418c2021-10-19 18:26:13 +03001872 replicas = None
1873
1874 if kdu_values:
1875 resource_values = (
1876 kdu_values.get(resource_name, None) if resource_name else None
1877 )
Pedro Escaleira5d542b52022-06-04 22:21:57 +01001878
1879 for replica_str in ("replicaCount", "replicas"):
1880 if resource_values:
1881 replicas = resource_values.get(replica_str)
1882 else:
1883 replicas = kdu_values.get(replica_str)
1884
1885 if replicas is not None:
1886 break
aktas867418c2021-10-19 18:26:13 +03001887
1888 return replicas
1889
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001890 async def _store_status(
1891 self,
1892 cluster_id: str,
1893 operation: str,
1894 kdu_instance: str,
1895 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001896 db_dict: dict = None,
Pedro Escaleirab46f88d2022-04-23 19:55:45 +01001897 ) -> None:
1898 """
1899 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1900
1901 :param cluster_id (str): the cluster where the KDU instance is deployed
1902 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1903 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1904 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1905 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1906 values for the keys:
1907 - "collection": The Mongo DB collection to write to
1908 - "filter": The query filter to use in the update process
1909 - "path": The dot separated keys which targets the object to be updated
1910 Defaults to None.
1911 """
1912
1913 try:
1914 detailed_status = await self._status_kdu(
1915 cluster_id=cluster_id,
1916 kdu_instance=kdu_instance,
1917 yaml_format=False,
1918 namespace=namespace,
1919 )
1920
1921 status = detailed_status.get("info").get("description")
1922 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1923
1924 # write status to db
1925 result = await self.write_app_status_to_db(
1926 db_dict=db_dict,
1927 status=str(status),
1928 detailed_status=str(detailed_status),
1929 operation=operation,
1930 )
1931
1932 if not result:
1933 self.log.info("Error writing in database. Task exiting...")
1934
1935 except asyncio.CancelledError as e:
1936 self.log.warning(
1937 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1938 )
1939 except Exception as e:
1940 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001941
1942 # params for use in -f file
1943 # returns values file option and filename (in order to delete it at the end)
1944 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001945 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001946 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001947
1948 def get_random_number():
1949 r = random.randrange(start=1, stop=99999999)
1950 s = str(r)
1951 while len(s) < 10:
1952 s = "0" + s
1953 return s
1954
1955 params2 = dict()
1956 for key in params:
1957 value = params.get(key)
1958 if "!!yaml" in str(value):
David Garcia513cb2d2022-05-31 11:01:09 +02001959 value = yaml.safe_load(value[7:])
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001960 params2[key] = value
1961
1962 values_file = get_random_number() + ".yaml"
1963 with open(values_file, "w") as stream:
1964 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1965
1966 return "-f {}".format(values_file), values_file
1967
1968 return "", None
1969
1970 # params for use in --set option
1971 @staticmethod
1972 def _params_to_set_option(params: dict) -> str:
1973 params_str = ""
1974 if params and len(params) > 0:
1975 start = True
1976 for key in params:
1977 value = params.get(key, None)
1978 if value is not None:
1979 if start:
1980 params_str += "--set "
1981 start = False
1982 else:
1983 params_str += ","
1984 params_str += "{}={}".format(key, value)
1985 return params_str
1986
1987 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001988 def generate_kdu_instance_name(**kwargs):
1989 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001990 # check embeded chart (file or dir)
1991 if chart_name.startswith("/"):
1992 # extract file or directory name
David Garcia4ae527e2021-07-26 16:04:59 +02001993 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001994 # check URL
1995 elif "://" in chart_name:
1996 # extract last portion of URL
David Garcia4ae527e2021-07-26 16:04:59 +02001997 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001998
1999 name = ""
2000 for c in chart_name:
2001 if c.isalpha() or c.isnumeric():
2002 name += c
2003 else:
2004 name += "-"
2005 if len(name) > 35:
2006 name = name[0:35]
2007
2008 # if does not start with alpha character, prefix 'a'
2009 if not name[0].isalpha():
2010 name = "a" + name
2011
2012 name += "-"
2013
2014 def get_random_number():
2015 r = random.randrange(start=1, stop=99999999)
2016 s = str(r)
2017 s = s.rjust(10, "0")
2018 return s
2019
2020 name = name + get_random_number()
2021 return name.lower()
aktas867418c2021-10-19 18:26:13 +03002022
2023 def _split_version(self, kdu_model: str) -> (str, str):
2024 version = None
garciadeblas04393192022-06-08 15:39:24 +02002025 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
aktas867418c2021-10-19 18:26:13 +03002026 parts = kdu_model.split(sep=":")
2027 if len(parts) == 2:
2028 version = str(parts[1])
2029 kdu_model = parts[0]
2030 return kdu_model, version
2031
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002032 def _split_repo(self, kdu_model: str) -> (str, str):
2033 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2034
2035 Args:
2036 kdu_model (str): Associated KDU model
2037
2038 Returns:
2039 (str, str): Tuple with the Chart name in index 0, and the repo name
2040 in index 2; if there was a problem finding them, return None
2041 for both
2042 """
2043
2044 chart_name = None
garciadeblas7faf4ec2022-04-08 22:53:25 +02002045 repo_name = None
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002046
garciadeblas7faf4ec2022-04-08 22:53:25 +02002047 idx = kdu_model.find("/")
2048 if idx >= 0:
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002049 chart_name = kdu_model[idx + 1 :]
garciadeblas7faf4ec2022-04-08 22:53:25 +02002050 repo_name = kdu_model[:idx]
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002051
2052 return chart_name, repo_name
garciadeblas7faf4ec2022-04-08 22:53:25 +02002053
aktas867418c2021-10-19 18:26:13 +03002054 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
Pedro Escaleira547f8232022-06-03 19:48:46 +01002055 """Obtain the Helm repository for an Helm Chart
2056
2057 Args:
2058 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2059 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2060
2061 Returns:
2062 str: the repository URL; if Helm Chart is a local one, the function returns None
2063 """
2064
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002065 _, repo_name = self._split_repo(kdu_model=kdu_model)
2066
aktas867418c2021-10-19 18:26:13 +03002067 repo_url = None
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002068 if repo_name:
aktas867418c2021-10-19 18:26:13 +03002069 # Find repository link
2070 local_repo_list = await self.repo_list(cluster_uuid)
2071 for repo in local_repo_list:
Pedro Escaleira2f0692e2022-06-04 19:14:11 +01002072 if repo["name"] == repo_name:
2073 repo_url = repo["url"]
2074 break # it is not necessary to continue the loop if the repo link was found...
2075
aktas867418c2021-10-19 18:26:13 +03002076 return repo_url