blob: b3d7bda3c83f40812ccf03d2a196ea247099dc48 [file] [log] [blame]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
22import abc
23import asyncio
24import random
25import time
26import shlex
27import shutil
28import stat
29import subprocess
30import os
31import yaml
32from uuid import uuid4
33
34from n2vc.exceptions import K8sException
35from n2vc.k8s_conn import K8sConnector
36
37
38class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
garciadeblas82b591c2021-03-24 09:22:13 +010045
lloretgalleg1c83f2e2020-10-22 09:12:35 +000046 service_account = "osm"
lloretgalleg83e55892020-12-17 12:42:11 +000047 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
lloretgalleg1c83f2e2020-10-22 09:12:35 +000048
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
lloretgalleg83e55892020-12-17 12:42:11 +000057 vca_config: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +000058 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
lloretgalleg83e55892020-12-17 12:42:11 +000088 # obtain stable repo url from config or apply default
89 if not vca_config or not vca_config.get("stablerepourl"):
90 self._stable_repo_url = self._STABLE_REPO_URL
91 else:
92 self._stable_repo_url = vca_config.get("stablerepourl")
93
lloretgalleg1c83f2e2020-10-22 09:12:35 +000094 @staticmethod
95 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
96 """
97 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
98 cluster_id for backward compatibility
99 """
garciadeblas82b591c2021-03-24 09:22:13 +0100100 namespace, _, cluster_id = cluster_uuid.rpartition(":")
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000101 return namespace, cluster_id
102
103 async def init_env(
garciadeblas82b591c2021-03-24 09:22:13 +0100104 self,
105 k8s_creds: str,
106 namespace: str = "kube-system",
107 reuse_cluster_uuid=None,
108 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000109 ) -> (str, bool):
110 """
111 It prepares a given K8s cluster environment to run Charts
112
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 '.kube/config'
115 :param namespace: optional namespace to be used for helm. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
David Garciaeb8943a2021-04-12 12:07:37 +0200118 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000119 :return: uuid of the K8s cluster and True if connector has installed some
120 software in the cluster
121 (on error, an exception will be raised)
122 """
123
124 if reuse_cluster_uuid:
125 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
126 namespace = namespace_ or namespace
127 else:
128 cluster_id = str(uuid4())
129 cluster_uuid = "{}:{}".format(namespace, cluster_id)
130
garciadeblas82b591c2021-03-24 09:22:13 +0100131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
151 return cluster_uuid, n2vc_installed_sw
152
153 async def repo_add(
garciadeblas82b591c2021-03-24 09:22:13 +0100154 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000155 ):
156 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_id, repo_type, name, url
160 )
161 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000162
163 # sync local dir
164 self.fs.sync(from_path=cluster_id)
165
166 # init_env
167 paths, env = self._init_paths_env(
168 cluster_name=cluster_id, create_if_not_exist=True
169 )
170
171 # helm repo update
garciadeblas82b591c2021-03-24 09:22:13 +0100172 command = "{} repo update".format(self._helm_command)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000173 self.log.debug("updating repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100174 await self._local_async_exec(
175 command=command, raise_exception_on_error=False, env=env
176 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000177
178 # helm repo add name url
garciadeblas82b591c2021-03-24 09:22:13 +0100179 command = "{} repo add {} {}".format(self._helm_command, name, url)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000180 self.log.debug("adding repo: {}".format(command))
garciadeblas82b591c2021-03-24 09:22:13 +0100181 await self._local_async_exec(
182 command=command, raise_exception_on_error=True, env=env
183 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000184
185 # sync fs
186 self.fs.reverse_sync(from_path=cluster_id)
187
188 async def repo_list(self, cluster_uuid: str) -> list:
189 """
190 Get the list of registered repositories
191
192 :return: list of registered repositories: [ (name, url) .... ]
193 """
194
195 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
196 self.log.debug("list repositories for cluster {}".format(cluster_id))
197
198 # sync local dir
199 self.fs.sync(from_path=cluster_id)
200
201 # config filename
202 paths, env = self._init_paths_env(
203 cluster_name=cluster_id, create_if_not_exist=True
204 )
205
garciadeblas82b591c2021-03-24 09:22:13 +0100206 command = "{} repo list --output yaml".format(self._helm_command)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000207
208 # Set exception to false because if there are no repos just want an empty list
209 output, _rc = await self._local_async_exec(
210 command=command, raise_exception_on_error=False, env=env
211 )
212
213 # sync fs
214 self.fs.reverse_sync(from_path=cluster_id)
215
216 if _rc == 0:
217 if output and len(output) > 0:
218 repos = yaml.load(output, Loader=yaml.SafeLoader)
219 # unify format between helm2 and helm3 setting all keys lowercase
220 return self._lower_keys_list(repos)
221 else:
222 return []
223 else:
224 return []
225
226 async def repo_remove(self, cluster_uuid: str, name: str):
227
228 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
229 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_id)
233
234 # init env, paths
235 paths, env = self._init_paths_env(
236 cluster_name=cluster_id, create_if_not_exist=True
237 )
238
garciadeblas82b591c2021-03-24 09:22:13 +0100239 command = "{} repo remove {}".format(self._helm_command, name)
240 await self._local_async_exec(
241 command=command, raise_exception_on_error=True, env=env
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000242 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000243
244 # sync fs
245 self.fs.reverse_sync(from_path=cluster_id)
246
247 async def reset(
garciadeblas82b591c2021-03-24 09:22:13 +0100248 self,
249 cluster_uuid: str,
250 force: bool = False,
251 uninstall_sw: bool = False,
252 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000253 ) -> bool:
David Garciaeb8943a2021-04-12 12:07:37 +0200254 """Reset a cluster
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000255
David Garciaeb8943a2021-04-12 12:07:37 +0200256 Resets the Kubernetes cluster by removing the helm deployment that represents it.
257
258 :param cluster_uuid: The UUID of the cluster to reset
259 :param force: Boolean to force the reset
260 :param uninstall_sw: Boolean to force the reset
261 :param kwargs: Additional parameters (None yet)
262 :return: Returns True if successful or raises an exception.
263 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000264 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
garciadeblas82b591c2021-03-24 09:22:13 +0100265 self.log.debug(
266 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
267 cluster_id, uninstall_sw
268 )
269 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000270
271 # sync local dir
272 self.fs.sync(from_path=cluster_id)
273
274 # uninstall releases if needed.
275 if uninstall_sw:
276 releases = await self.instances_list(cluster_uuid=cluster_uuid)
277 if len(releases) > 0:
278 if force:
279 for r in releases:
280 try:
281 kdu_instance = r.get("name")
282 chart = r.get("chart")
283 self.log.debug(
284 "Uninstalling {} -> {}".format(chart, kdu_instance)
285 )
286 await self.uninstall(
287 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
288 )
289 except Exception as e:
290 # will not raise exception as it was found
291 # that in some cases of previously installed helm releases it
292 # raised an error
293 self.log.warn(
garciadeblas82b591c2021-03-24 09:22:13 +0100294 "Error uninstalling release {}: {}".format(
295 kdu_instance, e
296 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000297 )
298 else:
299 msg = (
300 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
301 ).format(cluster_id)
302 self.log.warn(msg)
garciadeblas82b591c2021-03-24 09:22:13 +0100303 uninstall_sw = (
304 False # Allow to remove k8s cluster without removing Tiller
305 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000306
307 if uninstall_sw:
308 await self._uninstall_sw(cluster_id, namespace)
309
310 # delete cluster directory
311 self.log.debug("Removing directory {}".format(cluster_id))
312 self.fs.file_delete(cluster_id, ignore_non_exist=True)
313 # Remove also local directorio if still exist
314 direct = self.fs.path + "/" + cluster_id
315 shutil.rmtree(direct, ignore_errors=True)
316
317 return True
318
lloretgalleg095392b2020-11-20 11:28:08 +0000319 async def _install_impl(
garciadeblas82b591c2021-03-24 09:22:13 +0100320 self,
321 cluster_id: str,
322 kdu_model: str,
323 paths: dict,
324 env: dict,
325 kdu_instance: str,
326 atomic: bool = True,
327 timeout: float = 300,
328 params: dict = None,
329 db_dict: dict = None,
330 kdu_name: str = None,
331 namespace: str = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000332 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000333 # params to str
334 params_str, file_to_delete = self._params_to_file_option(
335 cluster_id=cluster_id, params=params
336 )
337
338 # version
339 version = None
340 if ":" in kdu_model:
341 parts = kdu_model.split(sep=":")
342 if len(parts) == 2:
343 version = str(parts[1])
344 kdu_model = parts[0]
345
garciadeblas82b591c2021-03-24 09:22:13 +0100346 command = self._get_install_command(
347 kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
348 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000349
350 self.log.debug("installing: {}".format(command))
351
352 if atomic:
353 # exec helm in a task
354 exec_task = asyncio.ensure_future(
355 coro_or_future=self._local_async_exec(
356 command=command, raise_exception_on_error=False, env=env
357 )
358 )
359
360 # write status in another task
361 status_task = asyncio.ensure_future(
362 coro_or_future=self._store_status(
363 cluster_id=cluster_id,
364 kdu_instance=kdu_instance,
365 namespace=namespace,
366 db_dict=db_dict,
367 operation="install",
368 run_once=False,
369 )
370 )
371
372 # wait for execution task
373 await asyncio.wait([exec_task])
374
375 # cancel status task
376 status_task.cancel()
377
378 output, rc = exec_task.result()
379
380 else:
381
382 output, rc = await self._local_async_exec(
383 command=command, raise_exception_on_error=False, env=env
384 )
385
386 # remove temporal values yaml file
387 if file_to_delete:
388 os.remove(file_to_delete)
389
390 # write final status
391 await self._store_status(
392 cluster_id=cluster_id,
393 kdu_instance=kdu_instance,
394 namespace=namespace,
395 db_dict=db_dict,
396 operation="install",
397 run_once=True,
398 check_every=0,
399 )
400
401 if rc != 0:
402 msg = "Error executing command: {}\nOutput: {}".format(command, output)
403 self.log.error(msg)
404 raise K8sException(msg)
405
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000406 async def upgrade(
407 self,
408 cluster_uuid: str,
409 kdu_instance: str,
410 kdu_model: str = None,
411 atomic: bool = True,
412 timeout: float = 300,
413 params: dict = None,
414 db_dict: dict = None,
415 ):
416 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
417 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
418
419 # sync local dir
420 self.fs.sync(from_path=cluster_id)
421
422 # look for instance to obtain namespace
423 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
424 if not instance_info:
425 raise K8sException("kdu_instance {} not found".format(kdu_instance))
426
427 # init env, paths
428 paths, env = self._init_paths_env(
429 cluster_name=cluster_id, create_if_not_exist=True
430 )
431
432 # params to str
433 params_str, file_to_delete = self._params_to_file_option(
434 cluster_id=cluster_id, params=params
435 )
436
437 # version
438 version = None
439 if ":" in kdu_model:
440 parts = kdu_model.split(sep=":")
441 if len(parts) == 2:
442 version = str(parts[1])
443 kdu_model = parts[0]
444
garciadeblas82b591c2021-03-24 09:22:13 +0100445 command = self._get_upgrade_command(
446 kdu_model,
447 kdu_instance,
448 instance_info["namespace"],
449 params_str,
450 version,
451 atomic,
452 timeout,
453 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000454
455 self.log.debug("upgrading: {}".format(command))
456
457 if atomic:
458
459 # exec helm in a task
460 exec_task = asyncio.ensure_future(
461 coro_or_future=self._local_async_exec(
462 command=command, raise_exception_on_error=False, env=env
463 )
464 )
465 # write status in another task
466 status_task = asyncio.ensure_future(
467 coro_or_future=self._store_status(
468 cluster_id=cluster_id,
469 kdu_instance=kdu_instance,
470 namespace=instance_info["namespace"],
471 db_dict=db_dict,
472 operation="upgrade",
473 run_once=False,
474 )
475 )
476
477 # wait for execution task
478 await asyncio.wait([exec_task])
479
480 # cancel status task
481 status_task.cancel()
482 output, rc = exec_task.result()
483
484 else:
485
486 output, rc = await self._local_async_exec(
487 command=command, raise_exception_on_error=False, env=env
488 )
489
490 # remove temporal values yaml file
491 if file_to_delete:
492 os.remove(file_to_delete)
493
494 # write final status
495 await self._store_status(
496 cluster_id=cluster_id,
497 kdu_instance=kdu_instance,
498 namespace=instance_info["namespace"],
499 db_dict=db_dict,
500 operation="upgrade",
501 run_once=True,
502 check_every=0,
503 )
504
505 if rc != 0:
506 msg = "Error executing command: {}\nOutput: {}".format(command, output)
507 self.log.error(msg)
508 raise K8sException(msg)
509
510 # sync fs
511 self.fs.reverse_sync(from_path=cluster_id)
512
513 # return new revision number
514 instance = await self.get_instance_info(
515 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
516 )
517 if instance:
518 revision = int(instance.get("revision"))
519 self.log.debug("New revision: {}".format(revision))
520 return revision
521 else:
522 return 0
523
aktas2962f3e2021-03-15 11:05:35 +0300524 async def scale(
garciadeblas82b591c2021-03-24 09:22:13 +0100525 self,
526 kdu_instance: str,
527 scale: int,
528 resource_name: str,
529 total_timeout: float = 1800,
530 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300531 ):
532 raise NotImplementedError("Method not implemented")
533
534 async def get_scale_count(
garciadeblas82b591c2021-03-24 09:22:13 +0100535 self,
536 resource_name: str,
537 kdu_instance: str,
538 **kwargs,
aktas2962f3e2021-03-15 11:05:35 +0300539 ):
540 raise NotImplementedError("Method not implemented")
541
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000542 async def rollback(
543 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
544 ):
545
546 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
547 self.log.debug(
548 "rollback kdu_instance {} to revision {} from cluster {}".format(
549 kdu_instance, revision, cluster_id
550 )
551 )
552
553 # sync local dir
554 self.fs.sync(from_path=cluster_id)
555
556 # look for instance to obtain namespace
557 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
558 if not instance_info:
559 raise K8sException("kdu_instance {} not found".format(kdu_instance))
560
561 # init env, paths
562 paths, env = self._init_paths_env(
563 cluster_name=cluster_id, create_if_not_exist=True
564 )
565
garciadeblas82b591c2021-03-24 09:22:13 +0100566 command = self._get_rollback_command(
567 kdu_instance, instance_info["namespace"], revision
568 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000569
570 self.log.debug("rolling_back: {}".format(command))
571
572 # exec helm in a task
573 exec_task = asyncio.ensure_future(
574 coro_or_future=self._local_async_exec(
575 command=command, raise_exception_on_error=False, env=env
576 )
577 )
578 # write status in another task
579 status_task = asyncio.ensure_future(
580 coro_or_future=self._store_status(
581 cluster_id=cluster_id,
582 kdu_instance=kdu_instance,
583 namespace=instance_info["namespace"],
584 db_dict=db_dict,
585 operation="rollback",
586 run_once=False,
587 )
588 )
589
590 # wait for execution task
591 await asyncio.wait([exec_task])
592
593 # cancel status task
594 status_task.cancel()
595
596 output, rc = exec_task.result()
597
598 # write final status
599 await self._store_status(
600 cluster_id=cluster_id,
601 kdu_instance=kdu_instance,
602 namespace=instance_info["namespace"],
603 db_dict=db_dict,
604 operation="rollback",
605 run_once=True,
606 check_every=0,
607 )
608
609 if rc != 0:
610 msg = "Error executing command: {}\nOutput: {}".format(command, output)
611 self.log.error(msg)
612 raise K8sException(msg)
613
614 # sync fs
615 self.fs.reverse_sync(from_path=cluster_id)
616
617 # return new revision number
618 instance = await self.get_instance_info(
619 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
620 )
621 if instance:
622 revision = int(instance.get("revision"))
623 self.log.debug("New revision: {}".format(revision))
624 return revision
625 else:
626 return 0
627
David Garciaeb8943a2021-04-12 12:07:37 +0200628 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000629 """
630 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
631 (this call should happen after all _terminate-config-primitive_ of the VNF
632 are invoked).
633
634 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
635 :param kdu_instance: unique name for the KDU instance to be deleted
David Garciaeb8943a2021-04-12 12:07:37 +0200636 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000637 :return: True if successful
638 """
639
640 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
641 self.log.debug(
garciadeblas82b591c2021-03-24 09:22:13 +0100642 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000643 )
644
645 # sync local dir
646 self.fs.sync(from_path=cluster_id)
647
648 # look for instance to obtain namespace
649 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
650 if not instance_info:
651 raise K8sException("kdu_instance {} not found".format(kdu_instance))
652
653 # init env, paths
654 paths, env = self._init_paths_env(
655 cluster_name=cluster_id, create_if_not_exist=True
656 )
657
658 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
659 output, _rc = await self._local_async_exec(
660 command=command, raise_exception_on_error=True, env=env
661 )
662
663 # sync fs
664 self.fs.reverse_sync(from_path=cluster_id)
665
666 return self._output_to_table(output)
667
668 async def instances_list(self, cluster_uuid: str) -> list:
669 """
670 returns a list of deployed releases in a cluster
671
672 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
673 :return:
674 """
675
676 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
677 self.log.debug("list releases for cluster {}".format(cluster_id))
678
679 # sync local dir
680 self.fs.sync(from_path=cluster_id)
681
682 # execute internal command
683 result = await self._instances_list(cluster_id)
684
685 # sync fs
686 self.fs.reverse_sync(from_path=cluster_id)
687
688 return result
689
690 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
691 instances = await self.instances_list(cluster_uuid=cluster_uuid)
692 for instance in instances:
693 if instance.get("name") == kdu_instance:
694 return instance
695 self.log.debug("Instance {} not found".format(kdu_instance))
696 return None
697
698 async def exec_primitive(
699 self,
700 cluster_uuid: str = None,
701 kdu_instance: str = None,
702 primitive_name: str = None,
703 timeout: float = 300,
704 params: dict = None,
705 db_dict: dict = None,
David Garciaeb8943a2021-04-12 12:07:37 +0200706 **kwargs,
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000707 ) -> str:
708 """Exec primitive (Juju action)
709
710 :param cluster_uuid: The UUID of the cluster or namespace:cluster
711 :param kdu_instance: The unique name of the KDU instance
712 :param primitive_name: Name of action that will be executed
713 :param timeout: Timeout for action execution
714 :param params: Dictionary of all the parameters needed for the action
715 :db_dict: Dictionary for any additional data
David Garciaeb8943a2021-04-12 12:07:37 +0200716 :param kwargs: Additional parameters (None yet)
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000717
718 :return: Returns the output of the action
719 """
720 raise K8sException(
721 "KDUs deployed with Helm don't support actions "
722 "different from rollback, upgrade and status"
723 )
724
garciadeblas82b591c2021-03-24 09:22:13 +0100725 async def get_services(
726 self, cluster_uuid: str, kdu_instance: str, namespace: str
727 ) -> list:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000728 """
729 Returns a list of services defined for the specified kdu instance.
730
731 :param cluster_uuid: UUID of a K8s cluster known by OSM
732 :param kdu_instance: unique name for the KDU instance
733 :param namespace: K8s namespace used by the KDU instance
734 :return: If successful, it will return a list of services, Each service
735 can have the following data:
736 - `name` of the service
737 - `type` type of service in the k8 cluster
738 - `ports` List of ports offered by the service, for each port includes at least
739 name, port, protocol
740 - `cluster_ip` Internal ip to be used inside k8s cluster
741 - `external_ip` List of external ips (in case they are available)
742 """
743
744 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
745 self.log.debug(
746 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
747 cluster_uuid, kdu_instance
748 )
749 )
750
751 # sync local dir
752 self.fs.sync(from_path=cluster_id)
753
754 # get list of services names for kdu
755 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
756
757 service_list = []
758 for service in service_names:
759 service = await self._get_service(cluster_id, service, namespace)
760 service_list.append(service)
761
762 # sync fs
763 self.fs.reverse_sync(from_path=cluster_id)
764
765 return service_list
766
garciadeblas82b591c2021-03-24 09:22:13 +0100767 async def get_service(
768 self, cluster_uuid: str, service_name: str, namespace: str
769 ) -> object:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000770
771 self.log.debug(
772 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
garciadeblas82b591c2021-03-24 09:22:13 +0100773 service_name, namespace, cluster_uuid
774 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000775 )
776
777 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
778
779 # sync local dir
780 self.fs.sync(from_path=cluster_id)
781
782 service = await self._get_service(cluster_id, service_name, namespace)
783
784 # sync fs
785 self.fs.reverse_sync(from_path=cluster_id)
786
787 return service
788
David Garciaeb8943a2021-04-12 12:07:37 +0200789 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
790 """
791 This call would retrieve tha current state of a given KDU instance. It would be
792 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
793 values_ of the configuration parameters applied to a given instance. This call
794 would be based on the `status` call.
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000795
David Garciaeb8943a2021-04-12 12:07:37 +0200796 :param cluster_uuid: UUID of a K8s cluster known by OSM
797 :param kdu_instance: unique name for the KDU instance
798 :param kwargs: Additional parameters (None yet)
799 :return: If successful, it will return the following vector of arguments:
800 - K8s `namespace` in the cluster where the KDU lives
801 - `state` of the KDU instance. It can be:
802 - UNKNOWN
803 - DEPLOYED
804 - DELETED
805 - SUPERSEDED
806 - FAILED or
807 - DELETING
808 - List of `resources` (objects) that this release consists of, sorted by kind,
809 and the status of those resources
810 - Last `deployment_time`.
811
812 """
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000813 self.log.debug(
814 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
815 cluster_uuid, kdu_instance
816 )
817 )
818
819 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
820
821 # sync local dir
822 self.fs.sync(from_path=cluster_id)
823
824 # get instance: needed to obtain namespace
825 instances = await self._instances_list(cluster_id=cluster_id)
826 for instance in instances:
827 if instance.get("name") == kdu_instance:
828 break
829 else:
830 # instance does not exist
garciadeblas82b591c2021-03-24 09:22:13 +0100831 raise K8sException(
832 "Instance name: {} not found in cluster: {}".format(
833 kdu_instance, cluster_id
834 )
835 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000836
837 status = await self._status_kdu(
838 cluster_id=cluster_id,
839 kdu_instance=kdu_instance,
840 namespace=instance["namespace"],
841 show_error_log=True,
842 return_text=True,
843 )
844
845 # sync fs
846 self.fs.reverse_sync(from_path=cluster_id)
847
848 return status
849
850 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
851
852 self.log.debug(
853 "inspect kdu_model values {} from (optional) repo: {}".format(
854 kdu_model, repo_url
855 )
856 )
857
858 return await self._exec_inspect_comand(
859 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
860 )
861
862 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
863
864 self.log.debug(
865 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
866 )
867
868 return await self._exec_inspect_comand(
869 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
870 )
871
872 async def synchronize_repos(self, cluster_uuid: str):
873
874 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
875 try:
876 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
877 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
878
879 local_repo_list = await self.repo_list(cluster_uuid)
880 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
881
882 deleted_repo_list = []
883 added_repo_dict = {}
884
885 # iterate over the list of repos in the database that should be
886 # added if not present
887 for repo_name, db_repo in db_repo_dict.items():
888 try:
889 # check if it is already present
890 curr_repo_url = local_repo_dict.get(db_repo["name"])
891 repo_id = db_repo.get("_id")
892 if curr_repo_url != db_repo["url"]:
893 if curr_repo_url:
garciadeblas82b591c2021-03-24 09:22:13 +0100894 self.log.debug(
895 "repo {} url changed, delete and and again".format(
896 db_repo["url"]
897 )
898 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000899 await self.repo_remove(cluster_uuid, db_repo["name"])
900 deleted_repo_list.append(repo_id)
901
902 # add repo
903 self.log.debug("add repo {}".format(db_repo["name"]))
garciadeblas82b591c2021-03-24 09:22:13 +0100904 await self.repo_add(
905 cluster_uuid, db_repo["name"], db_repo["url"]
906 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000907 added_repo_dict[repo_id] = db_repo["name"]
908 except Exception as e:
909 raise K8sException(
910 "Error adding repo id: {}, err_msg: {} ".format(
911 repo_id, repr(e)
912 )
913 )
914
915 # Delete repos that are present but not in nbi_list
916 for repo_name in local_repo_dict:
917 if not db_repo_dict.get(repo_name) and repo_name != "stable":
918 self.log.debug("delete repo {}".format(repo_name))
919 try:
920 await self.repo_remove(cluster_uuid, repo_name)
921 deleted_repo_list.append(repo_name)
922 except Exception as e:
923 self.warning(
924 "Error deleting repo, name: {}, err_msg: {}".format(
925 repo_name, str(e)
926 )
927 )
928
929 return deleted_repo_list, added_repo_dict
930
931 except K8sException:
932 raise
933 except Exception as e:
934 # Do not raise errors synchronizing repos
935 self.log.error("Error synchronizing repos: {}".format(e))
936 raise Exception("Error synchronizing repos: {}".format(e))
937
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000938 def _get_db_repos_dict(self, repo_ids: list):
939 db_repos_dict = {}
940 for repo_id in repo_ids:
941 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
942 db_repos_dict[db_repo["name"]] = db_repo
943 return db_repos_dict
944
945 """
946 ####################################################################################
947 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
948 ####################################################################################
949 """
950
951 @abc.abstractmethod
952 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
953 """
954 Creates and returns base cluster and kube dirs and returns them.
955 Also created helm3 dirs according to new directory specification, paths are
956 not returned but assigned to helm environment variables
957
958 :param cluster_name: cluster_name
959 :return: Dictionary with config_paths and dictionary with helm environment variables
960 """
961
962 @abc.abstractmethod
963 async def _cluster_init(self, cluster_id, namespace, paths, env):
964 """
965 Implements the helm version dependent cluster initialization
966 """
967
968 @abc.abstractmethod
969 async def _instances_list(self, cluster_id):
970 """
971 Implements the helm version dependent helm instances list
972 """
973
974 @abc.abstractmethod
975 async def _get_services(self, cluster_id, kdu_instance, namespace):
976 """
977 Implements the helm version dependent method to obtain services from a helm instance
978 """
979
980 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +0100981 async def _status_kdu(
982 self,
983 cluster_id: str,
984 kdu_instance: str,
985 namespace: str = None,
986 show_error_log: bool = False,
987 return_text: bool = False,
988 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000989 """
990 Implements the helm version dependent method to obtain status of a helm instance
991 """
992
993 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +0100994 def _get_install_command(
995 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
996 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +0000997 """
998 Obtain command to be executed to delete the indicated instance
999 """
1000
1001 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001002 def _get_upgrade_command(
1003 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
1004 ) -> str:
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001005 """
1006 Obtain command to be executed to upgrade the indicated instance
1007 """
1008
1009 @abc.abstractmethod
1010 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
1011 """
1012 Obtain command to be executed to rollback the indicated instance
1013 """
1014
1015 @abc.abstractmethod
1016 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
1017 """
1018 Obtain command to be executed to delete the indicated instance
1019 """
1020
1021 @abc.abstractmethod
garciadeblas82b591c2021-03-24 09:22:13 +01001022 def _get_inspect_command(
1023 self, show_command: str, kdu_model: str, repo_str: str, version: str
1024 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001025 """
1026 Obtain command to be executed to obtain information about the kdu
1027 """
1028
1029 @abc.abstractmethod
1030 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1031 """
1032 Method call to uninstall cluster software for helm. This method is dependent
1033 of helm version
1034 For Helm v2 it will be called when Tiller must be uninstalled
1035 For Helm v3 it does nothing and does not need to be callled
1036 """
1037
lloretgalleg095392b2020-11-20 11:28:08 +00001038 @abc.abstractmethod
1039 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1040 """
1041 Obtains the cluster repos identifiers
1042 """
1043
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001044 """
1045 ####################################################################################
1046 ################################### P R I V A T E ##################################
1047 ####################################################################################
1048 """
1049
1050 @staticmethod
1051 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1052 if os.path.exists(filename):
1053 return True
1054 else:
1055 msg = "File {} does not exist".format(filename)
1056 if exception_if_not_exists:
1057 raise K8sException(msg)
1058
1059 @staticmethod
1060 def _remove_multiple_spaces(strobj):
1061 strobj = strobj.strip()
1062 while " " in strobj:
1063 strobj = strobj.replace(" ", " ")
1064 return strobj
1065
1066 @staticmethod
1067 def _output_to_lines(output: str) -> list:
1068 output_lines = list()
1069 lines = output.splitlines(keepends=False)
1070 for line in lines:
1071 line = line.strip()
1072 if len(line) > 0:
1073 output_lines.append(line)
1074 return output_lines
1075
1076 @staticmethod
1077 def _output_to_table(output: str) -> list:
1078 output_table = list()
1079 lines = output.splitlines(keepends=False)
1080 for line in lines:
1081 line = line.replace("\t", " ")
1082 line_list = list()
1083 output_table.append(line_list)
1084 cells = line.split(sep=" ")
1085 for cell in cells:
1086 cell = cell.strip()
1087 if len(cell) > 0:
1088 line_list.append(cell)
1089 return output_table
1090
1091 @staticmethod
1092 def _parse_services(output: str) -> list:
1093 lines = output.splitlines(keepends=False)
1094 services = []
1095 for line in lines:
1096 line = line.replace("\t", " ")
1097 cells = line.split(sep=" ")
1098 if len(cells) > 0 and cells[0].startswith("service/"):
1099 elems = cells[0].split(sep="/")
1100 if len(elems) > 1:
1101 services.append(elems[1])
1102 return services
1103
1104 @staticmethod
1105 def _get_deep(dictionary: dict, members: tuple):
1106 target = dictionary
1107 value = None
1108 try:
1109 for m in members:
1110 value = target.get(m)
1111 if not value:
1112 return None
1113 else:
1114 target = value
1115 except Exception:
1116 pass
1117 return value
1118
1119 # find key:value in several lines
1120 @staticmethod
1121 def _find_in_lines(p_lines: list, p_key: str) -> str:
1122 for line in p_lines:
1123 try:
1124 if line.startswith(p_key + ":"):
1125 parts = line.split(":")
1126 the_value = parts[1].strip()
1127 return the_value
1128 except Exception:
1129 # ignore it
1130 pass
1131 return None
1132
1133 @staticmethod
1134 def _lower_keys_list(input_list: list):
1135 """
1136 Transform the keys in a list of dictionaries to lower case and returns a new list
1137 of dictionaries
1138 """
1139 new_list = []
1140 for dictionary in input_list:
1141 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1142 new_list.append(new_dict)
1143 return new_list
1144
1145 def _local_exec(self, command: str) -> (str, int):
1146 command = self._remove_multiple_spaces(command)
1147 self.log.debug("Executing sync local command: {}".format(command))
1148 # raise exception if fails
1149 output = ""
1150 try:
1151 output = subprocess.check_output(
1152 command, shell=True, universal_newlines=True
1153 )
1154 return_code = 0
1155 self.log.debug(output)
1156 except Exception:
1157 return_code = 1
1158
1159 return output, return_code
1160
1161 async def _local_async_exec(
1162 self,
1163 command: str,
1164 raise_exception_on_error: bool = False,
1165 show_error_log: bool = True,
1166 encode_utf8: bool = False,
garciadeblas82b591c2021-03-24 09:22:13 +01001167 env: dict = None,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001168 ) -> (str, int):
1169
1170 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
garciadeblas82b591c2021-03-24 09:22:13 +01001171 self.log.debug(
1172 "Executing async local command: {}, env: {}".format(command, env)
1173 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001174
1175 # split command
1176 command = shlex.split(command)
1177
1178 environ = os.environ.copy()
1179 if env:
1180 environ.update(env)
1181
1182 try:
1183 process = await asyncio.create_subprocess_exec(
garciadeblas82b591c2021-03-24 09:22:13 +01001184 *command,
1185 stdout=asyncio.subprocess.PIPE,
1186 stderr=asyncio.subprocess.PIPE,
1187 env=environ,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001188 )
1189
1190 # wait for command terminate
1191 stdout, stderr = await process.communicate()
1192
1193 return_code = process.returncode
1194
1195 output = ""
1196 if stdout:
1197 output = stdout.decode("utf-8").strip()
1198 # output = stdout.decode()
1199 if stderr:
1200 output = stderr.decode("utf-8").strip()
1201 # output = stderr.decode()
1202
1203 if return_code != 0 and show_error_log:
1204 self.log.debug(
1205 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1206 )
1207 else:
1208 self.log.debug("Return code: {}".format(return_code))
1209
1210 if raise_exception_on_error and return_code != 0:
1211 raise K8sException(output)
1212
1213 if encode_utf8:
1214 output = output.encode("utf-8").strip()
1215 output = str(output).replace("\\n", "\n")
1216
1217 return output, return_code
1218
1219 except asyncio.CancelledError:
1220 raise
1221 except K8sException:
1222 raise
1223 except Exception as e:
1224 msg = "Exception executing command: {} -> {}".format(command, e)
1225 self.log.error(msg)
1226 if raise_exception_on_error:
1227 raise K8sException(e) from e
1228 else:
1229 return "", -1
1230
garciadeblas82b591c2021-03-24 09:22:13 +01001231 async def _local_async_exec_pipe(
1232 self,
1233 command1: str,
1234 command2: str,
1235 raise_exception_on_error: bool = True,
1236 show_error_log: bool = True,
1237 encode_utf8: bool = False,
1238 env: dict = None,
1239 ):
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001240
1241 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1242 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1243 command = "{} | {}".format(command1, command2)
garciadeblas82b591c2021-03-24 09:22:13 +01001244 self.log.debug(
1245 "Executing async local command: {}, env: {}".format(command, env)
1246 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001247
1248 # split command
1249 command1 = shlex.split(command1)
1250 command2 = shlex.split(command2)
1251
1252 environ = os.environ.copy()
1253 if env:
1254 environ.update(env)
1255
1256 try:
1257 read, write = os.pipe()
1258 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1259 os.close(write)
garciadeblas82b591c2021-03-24 09:22:13 +01001260 process_2 = await asyncio.create_subprocess_exec(
1261 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1262 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001263 os.close(read)
1264 stdout, stderr = await process_2.communicate()
1265
1266 return_code = process_2.returncode
1267
1268 output = ""
1269 if stdout:
1270 output = stdout.decode("utf-8").strip()
1271 # output = stdout.decode()
1272 if stderr:
1273 output = stderr.decode("utf-8").strip()
1274 # output = stderr.decode()
1275
1276 if return_code != 0 and show_error_log:
1277 self.log.debug(
1278 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1279 )
1280 else:
1281 self.log.debug("Return code: {}".format(return_code))
1282
1283 if raise_exception_on_error and return_code != 0:
1284 raise K8sException(output)
1285
1286 if encode_utf8:
1287 output = output.encode("utf-8").strip()
1288 output = str(output).replace("\\n", "\n")
1289
1290 return output, return_code
1291 except asyncio.CancelledError:
1292 raise
1293 except K8sException:
1294 raise
1295 except Exception as e:
1296 msg = "Exception executing command: {} -> {}".format(command, e)
1297 self.log.error(msg)
1298 if raise_exception_on_error:
1299 raise K8sException(e) from e
1300 else:
1301 return "", -1
1302
1303 async def _get_service(self, cluster_id, service_name, namespace):
1304 """
1305 Obtains the data of the specified service in the k8cluster.
1306
1307 :param cluster_id: id of a K8s cluster known by OSM
1308 :param service_name: name of the K8s service in the specified namespace
1309 :param namespace: K8s namespace used by the KDU instance
1310 :return: If successful, it will return a service with the following data:
1311 - `name` of the service
1312 - `type` type of service in the k8 cluster
1313 - `ports` List of ports offered by the service, for each port includes at least
1314 name, port, protocol
1315 - `cluster_ip` Internal ip to be used inside k8s cluster
1316 - `external_ip` List of external ips (in case they are available)
1317 """
1318
1319 # init config, env
1320 paths, env = self._init_paths_env(
1321 cluster_name=cluster_id, create_if_not_exist=True
1322 )
1323
1324 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1325 self.kubectl_command, paths["kube_config"], namespace, service_name
1326 )
1327
1328 output, _rc = await self._local_async_exec(
1329 command=command, raise_exception_on_error=True, env=env
1330 )
1331
1332 data = yaml.load(output, Loader=yaml.SafeLoader)
1333
1334 service = {
1335 "name": service_name,
1336 "type": self._get_deep(data, ("spec", "type")),
1337 "ports": self._get_deep(data, ("spec", "ports")),
garciadeblas82b591c2021-03-24 09:22:13 +01001338 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001339 }
1340 if service["type"] == "LoadBalancer":
1341 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1342 ip_list = [elem["ip"] for elem in ip_map_list]
1343 service["external_ip"] = ip_list
1344
1345 return service
1346
1347 async def _exec_inspect_comand(
1348 self, inspect_command: str, kdu_model: str, repo_url: str = None
1349 ):
1350 """
1351 Obtains information about a kdu, no cluster (no env)
1352 """
1353
1354 repo_str = ""
1355 if repo_url:
1356 repo_str = " --repo {}".format(repo_url)
1357
1358 idx = kdu_model.find("/")
1359 if idx >= 0:
1360 idx += 1
1361 kdu_model = kdu_model[idx:]
1362
1363 version = ""
1364 if ":" in kdu_model:
1365 parts = kdu_model.split(sep=":")
1366 if len(parts) == 2:
1367 version = "--version {}".format(str(parts[1]))
1368 kdu_model = parts[0]
1369
garciadeblas82b591c2021-03-24 09:22:13 +01001370 full_command = self._get_inspect_command(
1371 inspect_command, kdu_model, repo_str, version
1372 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001373 output, _rc = await self._local_async_exec(
1374 command=full_command, encode_utf8=True
1375 )
1376
1377 return output
1378
1379 async def _store_status(
1380 self,
1381 cluster_id: str,
1382 operation: str,
1383 kdu_instance: str,
1384 namespace: str = None,
1385 check_every: float = 10,
1386 db_dict: dict = None,
1387 run_once: bool = False,
1388 ):
1389 while True:
1390 try:
1391 await asyncio.sleep(check_every)
1392 detailed_status = await self._status_kdu(
garciadeblas82b591c2021-03-24 09:22:13 +01001393 cluster_id=cluster_id,
1394 kdu_instance=kdu_instance,
1395 namespace=namespace,
1396 return_text=False,
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001397 )
1398 status = detailed_status.get("info").get("description")
garciadeblas82b591c2021-03-24 09:22:13 +01001399 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001400 # write status to db
1401 result = await self.write_app_status_to_db(
1402 db_dict=db_dict,
1403 status=str(status),
1404 detailed_status=str(detailed_status),
1405 operation=operation,
1406 )
1407 if not result:
1408 self.log.info("Error writing in database. Task exiting...")
1409 return
1410 except asyncio.CancelledError:
1411 self.log.debug("Task cancelled")
1412 return
1413 except Exception as e:
garciadeblas82b591c2021-03-24 09:22:13 +01001414 self.log.debug(
1415 "_store_status exception: {}".format(str(e)), exc_info=True
1416 )
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001417 pass
1418 finally:
1419 if run_once:
1420 return
1421
1422 # params for use in -f file
1423 # returns values file option and filename (in order to delete it at the end)
1424 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1425
1426 if params and len(params) > 0:
garciadeblas82b591c2021-03-24 09:22:13 +01001427 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001428
1429 def get_random_number():
1430 r = random.randrange(start=1, stop=99999999)
1431 s = str(r)
1432 while len(s) < 10:
1433 s = "0" + s
1434 return s
1435
1436 params2 = dict()
1437 for key in params:
1438 value = params.get(key)
1439 if "!!yaml" in str(value):
1440 value = yaml.load(value[7:])
1441 params2[key] = value
1442
1443 values_file = get_random_number() + ".yaml"
1444 with open(values_file, "w") as stream:
1445 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1446
1447 return "-f {}".format(values_file), values_file
1448
1449 return "", None
1450
1451 # params for use in --set option
1452 @staticmethod
1453 def _params_to_set_option(params: dict) -> str:
1454 params_str = ""
1455 if params and len(params) > 0:
1456 start = True
1457 for key in params:
1458 value = params.get(key, None)
1459 if value is not None:
1460 if start:
1461 params_str += "--set "
1462 start = False
1463 else:
1464 params_str += ","
1465 params_str += "{}={}".format(key, value)
1466 return params_str
1467
1468 @staticmethod
David Garciac4da25c2021-02-23 11:47:29 +01001469 def generate_kdu_instance_name(**kwargs):
1470 chart_name = kwargs["kdu_model"]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001471 # check embeded chart (file or dir)
1472 if chart_name.startswith("/"):
1473 # extract file or directory name
garciadeblas82b591c2021-03-24 09:22:13 +01001474 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001475 # check URL
1476 elif "://" in chart_name:
1477 # extract last portion of URL
garciadeblas82b591c2021-03-24 09:22:13 +01001478 chart_name = chart_name[chart_name.rfind("/") + 1 :]
lloretgalleg1c83f2e2020-10-22 09:12:35 +00001479
1480 name = ""
1481 for c in chart_name:
1482 if c.isalpha() or c.isnumeric():
1483 name += c
1484 else:
1485 name += "-"
1486 if len(name) > 35:
1487 name = name[0:35]
1488
1489 # if does not start with alpha character, prefix 'a'
1490 if not name[0].isalpha():
1491 name = "a" + name
1492
1493 name += "-"
1494
1495 def get_random_number():
1496 r = random.randrange(start=1, stop=99999999)
1497 s = str(r)
1498 s = s.rjust(10, "0")
1499 return s
1500
1501 name = name + get_random_number()
1502 return name.lower()