Fix black errors
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
96 def _get_namespace(self, cluster_uuid: str) -> str:
97 """
98 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
101 """
102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
108
109 async def init_env(
110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
131 cluster_id = reuse_cluster_uuid
132 else:
133 cluster_id = str(uuid4())
134
135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
155 return cluster_id, n2vc_installed_sw
156
157 async def repo_add(
158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
166 ):
167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid, repo_type, name, url
170 )
171 )
172
173 # init_env
174 paths, env = self._init_paths_env(
175 cluster_name=cluster_uuid, create_if_not_exist=True
176 )
177
178 # sync local dir
179 self.fs.sync(from_path=cluster_uuid)
180
181 # helm repo add name url
182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths["kube_config"], self._helm_command, name, url
184 )
185
186 if cert:
187 temp_cert_file = os.path.join(
188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
201 self.log.debug("adding repo: {}".format(command))
202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
205
206 # helm repo update
207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
215 # sync fs
216 self.fs.reverse_sync(from_path=cluster_uuid)
217
218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
251
252 # config filename
253 paths, env = self._init_paths_env(
254 cluster_name=cluster_uuid, create_if_not_exist=True
255 )
256
257 # sync local dir
258 self.fs.sync(from_path=cluster_uuid)
259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
270 self.fs.reverse_sync(from_path=cluster_uuid)
271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
286
287 # init env, paths
288 paths, env = self._init_paths_env(
289 cluster_name=cluster_uuid, create_if_not_exist=True
290 )
291
292 # sync local dir
293 self.fs.sync(from_path=cluster_uuid)
294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
300 )
301
302 # sync fs
303 self.fs.reverse_sync(from_path=cluster_uuid)
304
305 async def reset(
306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
311 ) -> bool:
312 """Reset a cluster
313
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid, uninstall_sw
326 )
327 )
328
329 # sync local dir
330 self.fs.sync(from_path=cluster_uuid)
331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid)
360 self.log.warn(msg)
361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
364
365 if uninstall_sw:
366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
367
368 # delete cluster directory
369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
371 # Remove also local directorio if still exist
372 direct = self.fs.path + "/" + cluster_uuid
373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
380 async def _install_impl(
381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
393 ):
394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
405 kdu_model, version = self._split_version(kdu_model)
406
407 _, repo = self._split_repo(kdu_model)
408 if repo:
409 await self.repo_update(cluster_id, repo)
410
411 command = self._get_install_command(
412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
420 )
421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452 output, rc = await self._local_async_exec(
453 command=command, raise_exception_on_error=False, env=env
454 )
455
456 # remove temporal values yaml file
457 if file_to_delete:
458 os.remove(file_to_delete)
459
460 # write final status
461 await self._store_status(
462 cluster_id=cluster_id,
463 kdu_instance=kdu_instance,
464 namespace=namespace,
465 db_dict=db_dict,
466 operation="install",
467 )
468
469 if rc != 0:
470 msg = "Error executing command: {}\nOutput: {}".format(command, output)
471 self.log.error(msg)
472 raise K8sException(msg)
473
474 async def upgrade(
475 self,
476 cluster_uuid: str,
477 kdu_instance: str,
478 kdu_model: str = None,
479 atomic: bool = True,
480 timeout: float = 300,
481 params: dict = None,
482 db_dict: dict = None,
483 ):
484 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
485
486 # sync local dir
487 self.fs.sync(from_path=cluster_uuid)
488
489 # look for instance to obtain namespace
490 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
491 if not instance_info:
492 raise K8sException("kdu_instance {} not found".format(kdu_instance))
493
494 # init env, paths
495 paths, env = self._init_paths_env(
496 cluster_name=cluster_uuid, create_if_not_exist=True
497 )
498
499 # sync local dir
500 self.fs.sync(from_path=cluster_uuid)
501
502 # params to str
503 params_str, file_to_delete = self._params_to_file_option(
504 cluster_id=cluster_uuid, params=params
505 )
506
507 # version
508 kdu_model, version = self._split_version(kdu_model)
509
510 _, repo = self._split_repo(kdu_model)
511 if repo:
512 await self.repo_update(cluster_uuid, repo)
513
514 command = self._get_upgrade_command(
515 kdu_model,
516 kdu_instance,
517 instance_info["namespace"],
518 params_str,
519 version,
520 atomic,
521 timeout,
522 paths["kube_config"],
523 )
524
525 self.log.debug("upgrading: {}".format(command))
526
527 if atomic:
528 # exec helm in a task
529 exec_task = asyncio.ensure_future(
530 coro_or_future=self._local_async_exec(
531 command=command, raise_exception_on_error=False, env=env
532 )
533 )
534 # write status in another task
535 status_task = asyncio.ensure_future(
536 coro_or_future=self._store_status(
537 cluster_id=cluster_uuid,
538 kdu_instance=kdu_instance,
539 namespace=instance_info["namespace"],
540 db_dict=db_dict,
541 operation="upgrade",
542 )
543 )
544
545 # wait for execution task
546 await asyncio.wait([exec_task])
547
548 # cancel status task
549 status_task.cancel()
550 output, rc = exec_task.result()
551
552 else:
553 output, rc = await self._local_async_exec(
554 command=command, raise_exception_on_error=False, env=env
555 )
556
557 # remove temporal values yaml file
558 if file_to_delete:
559 os.remove(file_to_delete)
560
561 # write final status
562 await self._store_status(
563 cluster_id=cluster_uuid,
564 kdu_instance=kdu_instance,
565 namespace=instance_info["namespace"],
566 db_dict=db_dict,
567 operation="upgrade",
568 )
569
570 if rc != 0:
571 msg = "Error executing command: {}\nOutput: {}".format(command, output)
572 self.log.error(msg)
573 raise K8sException(msg)
574
575 # sync fs
576 self.fs.reverse_sync(from_path=cluster_uuid)
577
578 # return new revision number
579 instance = await self.get_instance_info(
580 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
581 )
582 if instance:
583 revision = int(instance.get("revision"))
584 self.log.debug("New revision: {}".format(revision))
585 return revision
586 else:
587 return 0
588
589 async def scale(
590 self,
591 kdu_instance: str,
592 scale: int,
593 resource_name: str,
594 total_timeout: float = 1800,
595 cluster_uuid: str = None,
596 kdu_model: str = None,
597 atomic: bool = True,
598 db_dict: dict = None,
599 **kwargs,
600 ):
601 """Scale a resource in a Helm Chart.
602
603 Args:
604 kdu_instance: KDU instance name
605 scale: Scale to which to set the resource
606 resource_name: Resource name
607 total_timeout: The time, in seconds, to wait
608 cluster_uuid: The UUID of the cluster
609 kdu_model: The chart reference
610 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
611 The --wait flag will be set automatically if --atomic is used
612 db_dict: Dictionary for any additional data
613 kwargs: Additional parameters
614
615 Returns:
616 True if successful, False otherwise
617 """
618
619 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
620 if resource_name:
621 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
622 resource_name, kdu_model, cluster_uuid
623 )
624
625 self.log.debug(debug_mgs)
626
627 # look for instance to obtain namespace
628 # get_instance_info function calls the sync command
629 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
630 if not instance_info:
631 raise K8sException("kdu_instance {} not found".format(kdu_instance))
632
633 # init env, paths
634 paths, env = self._init_paths_env(
635 cluster_name=cluster_uuid, create_if_not_exist=True
636 )
637
638 # version
639 kdu_model, version = self._split_version(kdu_model)
640
641 repo_url = await self._find_repo(kdu_model, cluster_uuid)
642
643 _, replica_str = await self._get_replica_count_url(
644 kdu_model, repo_url, resource_name
645 )
646
647 command = self._get_upgrade_scale_command(
648 kdu_model,
649 kdu_instance,
650 instance_info["namespace"],
651 scale,
652 version,
653 atomic,
654 replica_str,
655 total_timeout,
656 resource_name,
657 paths["kube_config"],
658 )
659
660 self.log.debug("scaling: {}".format(command))
661
662 if atomic:
663 # exec helm in a task
664 exec_task = asyncio.ensure_future(
665 coro_or_future=self._local_async_exec(
666 command=command, raise_exception_on_error=False, env=env
667 )
668 )
669 # write status in another task
670 status_task = asyncio.ensure_future(
671 coro_or_future=self._store_status(
672 cluster_id=cluster_uuid,
673 kdu_instance=kdu_instance,
674 namespace=instance_info["namespace"],
675 db_dict=db_dict,
676 operation="scale",
677 )
678 )
679
680 # wait for execution task
681 await asyncio.wait([exec_task])
682
683 # cancel status task
684 status_task.cancel()
685 output, rc = exec_task.result()
686
687 else:
688 output, rc = await self._local_async_exec(
689 command=command, raise_exception_on_error=False, env=env
690 )
691
692 # write final status
693 await self._store_status(
694 cluster_id=cluster_uuid,
695 kdu_instance=kdu_instance,
696 namespace=instance_info["namespace"],
697 db_dict=db_dict,
698 operation="scale",
699 )
700
701 if rc != 0:
702 msg = "Error executing command: {}\nOutput: {}".format(command, output)
703 self.log.error(msg)
704 raise K8sException(msg)
705
706 # sync fs
707 self.fs.reverse_sync(from_path=cluster_uuid)
708
709 return True
710
711 async def get_scale_count(
712 self,
713 resource_name: str,
714 kdu_instance: str,
715 cluster_uuid: str,
716 kdu_model: str,
717 **kwargs,
718 ) -> int:
719 """Get a resource scale count.
720
721 Args:
722 cluster_uuid: The UUID of the cluster
723 resource_name: Resource name
724 kdu_instance: KDU instance name
725 kdu_model: The name or path of an Helm Chart
726 kwargs: Additional parameters
727
728 Returns:
729 Resource instance count
730 """
731
732 self.log.debug(
733 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
734 )
735
736 # look for instance to obtain namespace
737 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
738 if not instance_info:
739 raise K8sException("kdu_instance {} not found".format(kdu_instance))
740
741 # init env, paths
742 paths, _ = self._init_paths_env(
743 cluster_name=cluster_uuid, create_if_not_exist=True
744 )
745
746 replicas = await self._get_replica_count_instance(
747 kdu_instance=kdu_instance,
748 namespace=instance_info["namespace"],
749 kubeconfig=paths["kube_config"],
750 resource_name=resource_name,
751 )
752
753 self.log.debug(
754 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
755 )
756
757 # Get default value if scale count is not found from provided values
758 # Important note: this piece of code shall only be executed in the first scaling operation,
759 # since it is expected that the _get_replica_count_instance is able to obtain the number of
760 # replicas when a scale operation was already conducted previously for this KDU/resource!
761 if replicas is None:
762 repo_url = await self._find_repo(
763 kdu_model=kdu_model, cluster_uuid=cluster_uuid
764 )
765 replicas, _ = await self._get_replica_count_url(
766 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
767 )
768
769 self.log.debug(
770 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
771 f"{resource_name} obtained: {replicas}"
772 )
773
774 if replicas is None:
775 msg = "Replica count not found. Cannot be scaled"
776 self.log.error(msg)
777 raise K8sException(msg)
778
779 return int(replicas)
780
781 async def rollback(
782 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
783 ):
784 self.log.debug(
785 "rollback kdu_instance {} to revision {} from cluster {}".format(
786 kdu_instance, revision, cluster_uuid
787 )
788 )
789
790 # sync local dir
791 self.fs.sync(from_path=cluster_uuid)
792
793 # look for instance to obtain namespace
794 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
795 if not instance_info:
796 raise K8sException("kdu_instance {} not found".format(kdu_instance))
797
798 # init env, paths
799 paths, env = self._init_paths_env(
800 cluster_name=cluster_uuid, create_if_not_exist=True
801 )
802
803 # sync local dir
804 self.fs.sync(from_path=cluster_uuid)
805
806 command = self._get_rollback_command(
807 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
808 )
809
810 self.log.debug("rolling_back: {}".format(command))
811
812 # exec helm in a task
813 exec_task = asyncio.ensure_future(
814 coro_or_future=self._local_async_exec(
815 command=command, raise_exception_on_error=False, env=env
816 )
817 )
818 # write status in another task
819 status_task = asyncio.ensure_future(
820 coro_or_future=self._store_status(
821 cluster_id=cluster_uuid,
822 kdu_instance=kdu_instance,
823 namespace=instance_info["namespace"],
824 db_dict=db_dict,
825 operation="rollback",
826 )
827 )
828
829 # wait for execution task
830 await asyncio.wait([exec_task])
831
832 # cancel status task
833 status_task.cancel()
834
835 output, rc = exec_task.result()
836
837 # write final status
838 await self._store_status(
839 cluster_id=cluster_uuid,
840 kdu_instance=kdu_instance,
841 namespace=instance_info["namespace"],
842 db_dict=db_dict,
843 operation="rollback",
844 )
845
846 if rc != 0:
847 msg = "Error executing command: {}\nOutput: {}".format(command, output)
848 self.log.error(msg)
849 raise K8sException(msg)
850
851 # sync fs
852 self.fs.reverse_sync(from_path=cluster_uuid)
853
854 # return new revision number
855 instance = await self.get_instance_info(
856 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
857 )
858 if instance:
859 revision = int(instance.get("revision"))
860 self.log.debug("New revision: {}".format(revision))
861 return revision
862 else:
863 return 0
864
865 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
866 """
867 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
868 (this call should happen after all _terminate-config-primitive_ of the VNF
869 are invoked).
870
871 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
872 :param kdu_instance: unique name for the KDU instance to be deleted
873 :param kwargs: Additional parameters (None yet)
874 :return: True if successful
875 """
876
877 self.log.debug(
878 "uninstall kdu_instance {} from cluster {}".format(
879 kdu_instance, cluster_uuid
880 )
881 )
882
883 # sync local dir
884 self.fs.sync(from_path=cluster_uuid)
885
886 # look for instance to obtain namespace
887 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
888 if not instance_info:
889 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
890 return True
891 # init env, paths
892 paths, env = self._init_paths_env(
893 cluster_name=cluster_uuid, create_if_not_exist=True
894 )
895
896 # sync local dir
897 self.fs.sync(from_path=cluster_uuid)
898
899 command = self._get_uninstall_command(
900 kdu_instance, instance_info["namespace"], paths["kube_config"]
901 )
902 output, _rc = await self._local_async_exec(
903 command=command, raise_exception_on_error=True, env=env
904 )
905
906 # sync fs
907 self.fs.reverse_sync(from_path=cluster_uuid)
908
909 return self._output_to_table(output)
910
911 async def instances_list(self, cluster_uuid: str) -> list:
912 """
913 returns a list of deployed releases in a cluster
914
915 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
916 :return:
917 """
918
919 self.log.debug("list releases for cluster {}".format(cluster_uuid))
920
921 # sync local dir
922 self.fs.sync(from_path=cluster_uuid)
923
924 # execute internal command
925 result = await self._instances_list(cluster_uuid)
926
927 # sync fs
928 self.fs.reverse_sync(from_path=cluster_uuid)
929
930 return result
931
932 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
933 instances = await self.instances_list(cluster_uuid=cluster_uuid)
934 for instance in instances:
935 if instance.get("name") == kdu_instance:
936 return instance
937 self.log.debug("Instance {} not found".format(kdu_instance))
938 return None
939
940 async def upgrade_charm(
941 self,
942 ee_id: str = None,
943 path: str = None,
944 charm_id: str = None,
945 charm_type: str = None,
946 timeout: float = None,
947 ) -> str:
948 """This method upgrade charms in VNFs
949
950 Args:
951 ee_id: Execution environment id
952 path: Local path to the charm
953 charm_id: charm-id
954 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
955 timeout: (Float) Timeout for the ns update operation
956
957 Returns:
958 The output of the update operation if status equals to "completed"
959 """
960 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
961
962 async def exec_primitive(
963 self,
964 cluster_uuid: str = None,
965 kdu_instance: str = None,
966 primitive_name: str = None,
967 timeout: float = 300,
968 params: dict = None,
969 db_dict: dict = None,
970 **kwargs,
971 ) -> str:
972 """Exec primitive (Juju action)
973
974 :param cluster_uuid: The UUID of the cluster or namespace:cluster
975 :param kdu_instance: The unique name of the KDU instance
976 :param primitive_name: Name of action that will be executed
977 :param timeout: Timeout for action execution
978 :param params: Dictionary of all the parameters needed for the action
979 :db_dict: Dictionary for any additional data
980 :param kwargs: Additional parameters (None yet)
981
982 :return: Returns the output of the action
983 """
984 raise K8sException(
985 "KDUs deployed with Helm don't support actions "
986 "different from rollback, upgrade and status"
987 )
988
989 async def get_services(
990 self, cluster_uuid: str, kdu_instance: str, namespace: str
991 ) -> list:
992 """
993 Returns a list of services defined for the specified kdu instance.
994
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param namespace: K8s namespace used by the KDU instance
998 :return: If successful, it will return a list of services, Each service
999 can have the following data:
1000 - `name` of the service
1001 - `type` type of service in the k8 cluster
1002 - `ports` List of ports offered by the service, for each port includes at least
1003 name, port, protocol
1004 - `cluster_ip` Internal ip to be used inside k8s cluster
1005 - `external_ip` List of external ips (in case they are available)
1006 """
1007
1008 self.log.debug(
1009 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1010 cluster_uuid, kdu_instance
1011 )
1012 )
1013
1014 # init env, paths
1015 paths, env = self._init_paths_env(
1016 cluster_name=cluster_uuid, create_if_not_exist=True
1017 )
1018
1019 # sync local dir
1020 self.fs.sync(from_path=cluster_uuid)
1021
1022 # get list of services names for kdu
1023 service_names = await self._get_services(
1024 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1025 )
1026
1027 service_list = []
1028 for service in service_names:
1029 service = await self._get_service(cluster_uuid, service, namespace)
1030 service_list.append(service)
1031
1032 # sync fs
1033 self.fs.reverse_sync(from_path=cluster_uuid)
1034
1035 return service_list
1036
1037 async def get_service(
1038 self, cluster_uuid: str, service_name: str, namespace: str
1039 ) -> object:
1040 self.log.debug(
1041 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1042 service_name, namespace, cluster_uuid
1043 )
1044 )
1045
1046 # sync local dir
1047 self.fs.sync(from_path=cluster_uuid)
1048
1049 service = await self._get_service(cluster_uuid, service_name, namespace)
1050
1051 # sync fs
1052 self.fs.reverse_sync(from_path=cluster_uuid)
1053
1054 return service
1055
1056 async def status_kdu(
1057 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1058 ) -> Union[str, dict]:
1059 """
1060 This call would retrieve tha current state of a given KDU instance. It would be
1061 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1062 values_ of the configuration parameters applied to a given instance. This call
1063 would be based on the `status` call.
1064
1065 :param cluster_uuid: UUID of a K8s cluster known by OSM
1066 :param kdu_instance: unique name for the KDU instance
1067 :param kwargs: Additional parameters (None yet)
1068 :param yaml_format: if the return shall be returned as an YAML string or as a
1069 dictionary
1070 :return: If successful, it will return the following vector of arguments:
1071 - K8s `namespace` in the cluster where the KDU lives
1072 - `state` of the KDU instance. It can be:
1073 - UNKNOWN
1074 - DEPLOYED
1075 - DELETED
1076 - SUPERSEDED
1077 - FAILED or
1078 - DELETING
1079 - List of `resources` (objects) that this release consists of, sorted by kind,
1080 and the status of those resources
1081 - Last `deployment_time`.
1082
1083 """
1084 self.log.debug(
1085 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1086 cluster_uuid, kdu_instance
1087 )
1088 )
1089
1090 # sync local dir
1091 self.fs.sync(from_path=cluster_uuid)
1092
1093 # get instance: needed to obtain namespace
1094 instances = await self._instances_list(cluster_id=cluster_uuid)
1095 for instance in instances:
1096 if instance.get("name") == kdu_instance:
1097 break
1098 else:
1099 # instance does not exist
1100 raise K8sException(
1101 "Instance name: {} not found in cluster: {}".format(
1102 kdu_instance, cluster_uuid
1103 )
1104 )
1105
1106 status = await self._status_kdu(
1107 cluster_id=cluster_uuid,
1108 kdu_instance=kdu_instance,
1109 namespace=instance["namespace"],
1110 yaml_format=yaml_format,
1111 show_error_log=True,
1112 )
1113
1114 # sync fs
1115 self.fs.reverse_sync(from_path=cluster_uuid)
1116
1117 return status
1118
1119 async def get_values_kdu(
1120 self, kdu_instance: str, namespace: str, kubeconfig: str
1121 ) -> str:
1122 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1123
1124 return await self._exec_get_command(
1125 get_command="values",
1126 kdu_instance=kdu_instance,
1127 namespace=namespace,
1128 kubeconfig=kubeconfig,
1129 )
1130
1131 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1132 """Method to obtain the Helm Chart package's values
1133
1134 Args:
1135 kdu_model: The name or path of an Helm Chart
1136 repo_url: Helm Chart repository url
1137
1138 Returns:
1139 str: the values of the Helm Chart package
1140 """
1141
1142 self.log.debug(
1143 "inspect kdu_model values {} from (optional) repo: {}".format(
1144 kdu_model, repo_url
1145 )
1146 )
1147
1148 return await self._exec_inspect_command(
1149 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1150 )
1151
1152 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1153 self.log.debug(
1154 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1155 )
1156
1157 return await self._exec_inspect_command(
1158 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1159 )
1160
1161 async def synchronize_repos(self, cluster_uuid: str):
1162 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1163 try:
1164 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1165 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1166
1167 local_repo_list = await self.repo_list(cluster_uuid)
1168 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1169
1170 deleted_repo_list = []
1171 added_repo_dict = {}
1172
1173 # iterate over the list of repos in the database that should be
1174 # added if not present
1175 for repo_name, db_repo in db_repo_dict.items():
1176 try:
1177 # check if it is already present
1178 curr_repo_url = local_repo_dict.get(db_repo["name"])
1179 repo_id = db_repo.get("_id")
1180 if curr_repo_url != db_repo["url"]:
1181 if curr_repo_url:
1182 self.log.debug(
1183 "repo {} url changed, delete and and again".format(
1184 db_repo["url"]
1185 )
1186 )
1187 await self.repo_remove(cluster_uuid, db_repo["name"])
1188 deleted_repo_list.append(repo_id)
1189
1190 # add repo
1191 self.log.debug("add repo {}".format(db_repo["name"]))
1192 if "ca_cert" in db_repo:
1193 await self.repo_add(
1194 cluster_uuid,
1195 db_repo["name"],
1196 db_repo["url"],
1197 cert=db_repo["ca_cert"],
1198 )
1199 else:
1200 await self.repo_add(
1201 cluster_uuid,
1202 db_repo["name"],
1203 db_repo["url"],
1204 )
1205 added_repo_dict[repo_id] = db_repo["name"]
1206 except Exception as e:
1207 raise K8sException(
1208 "Error adding repo id: {}, err_msg: {} ".format(
1209 repo_id, repr(e)
1210 )
1211 )
1212
1213 # Delete repos that are present but not in nbi_list
1214 for repo_name in local_repo_dict:
1215 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1216 self.log.debug("delete repo {}".format(repo_name))
1217 try:
1218 await self.repo_remove(cluster_uuid, repo_name)
1219 deleted_repo_list.append(repo_name)
1220 except Exception as e:
1221 self.warning(
1222 "Error deleting repo, name: {}, err_msg: {}".format(
1223 repo_name, str(e)
1224 )
1225 )
1226
1227 return deleted_repo_list, added_repo_dict
1228
1229 except K8sException:
1230 raise
1231 except Exception as e:
1232 # Do not raise errors synchronizing repos
1233 self.log.error("Error synchronizing repos: {}".format(e))
1234 raise Exception("Error synchronizing repos: {}".format(e))
1235
1236 def _get_db_repos_dict(self, repo_ids: list):
1237 db_repos_dict = {}
1238 for repo_id in repo_ids:
1239 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1240 db_repos_dict[db_repo["name"]] = db_repo
1241 return db_repos_dict
1242
1243 """
1244 ####################################################################################
1245 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1246 ####################################################################################
1247 """
1248
1249 @abc.abstractmethod
1250 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1251 """
1252 Creates and returns base cluster and kube dirs and returns them.
1253 Also created helm3 dirs according to new directory specification, paths are
1254 not returned but assigned to helm environment variables
1255
1256 :param cluster_name: cluster_name
1257 :return: Dictionary with config_paths and dictionary with helm environment variables
1258 """
1259
1260 @abc.abstractmethod
1261 async def _cluster_init(self, cluster_id, namespace, paths, env):
1262 """
1263 Implements the helm version dependent cluster initialization
1264 """
1265
1266 @abc.abstractmethod
1267 async def _instances_list(self, cluster_id):
1268 """
1269 Implements the helm version dependent helm instances list
1270 """
1271
1272 @abc.abstractmethod
1273 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1274 """
1275 Implements the helm version dependent method to obtain services from a helm instance
1276 """
1277
1278 @abc.abstractmethod
1279 async def _status_kdu(
1280 self,
1281 cluster_id: str,
1282 kdu_instance: str,
1283 namespace: str = None,
1284 yaml_format: bool = False,
1285 show_error_log: bool = False,
1286 ) -> Union[str, dict]:
1287 """
1288 Implements the helm version dependent method to obtain status of a helm instance
1289 """
1290
1291 @abc.abstractmethod
1292 def _get_install_command(
1293 self,
1294 kdu_model,
1295 kdu_instance,
1296 namespace,
1297 params_str,
1298 version,
1299 atomic,
1300 timeout,
1301 kubeconfig,
1302 ) -> str:
1303 """
1304 Obtain command to be executed to delete the indicated instance
1305 """
1306
1307 @abc.abstractmethod
1308 def _get_upgrade_scale_command(
1309 self,
1310 kdu_model,
1311 kdu_instance,
1312 namespace,
1313 count,
1314 version,
1315 atomic,
1316 replicas,
1317 timeout,
1318 resource_name,
1319 kubeconfig,
1320 ) -> str:
1321 """Generates the command to scale a Helm Chart release
1322
1323 Args:
1324 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1325 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1326 namespace (str): Namespace where this KDU instance is deployed
1327 scale (int): Scale count
1328 version (str): Constraint with specific version of the Chart to use
1329 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1330 The --wait flag will be set automatically if --atomic is used
1331 replica_str (str): The key under resource_name key where the scale count is stored
1332 timeout (float): The time, in seconds, to wait
1333 resource_name (str): The KDU's resource to scale
1334 kubeconfig (str): Kubeconfig file path
1335
1336 Returns:
1337 str: command to scale a Helm Chart release
1338 """
1339
1340 @abc.abstractmethod
1341 def _get_upgrade_command(
1342 self,
1343 kdu_model,
1344 kdu_instance,
1345 namespace,
1346 params_str,
1347 version,
1348 atomic,
1349 timeout,
1350 kubeconfig,
1351 ) -> str:
1352 """Generates the command to upgrade a Helm Chart release
1353
1354 Args:
1355 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1356 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1357 namespace (str): Namespace where this KDU instance is deployed
1358 params_str (str): Params used to upgrade the Helm Chart release
1359 version (str): Constraint with specific version of the Chart to use
1360 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1361 The --wait flag will be set automatically if --atomic is used
1362 timeout (float): The time, in seconds, to wait
1363 kubeconfig (str): Kubeconfig file path
1364
1365 Returns:
1366 str: command to upgrade a Helm Chart release
1367 """
1368
1369 @abc.abstractmethod
1370 def _get_rollback_command(
1371 self, kdu_instance, namespace, revision, kubeconfig
1372 ) -> str:
1373 """
1374 Obtain command to be executed to rollback the indicated instance
1375 """
1376
1377 @abc.abstractmethod
1378 def _get_uninstall_command(
1379 self, kdu_instance: str, namespace: str, kubeconfig: str
1380 ) -> str:
1381 """
1382 Obtain command to be executed to delete the indicated instance
1383 """
1384
1385 @abc.abstractmethod
1386 def _get_inspect_command(
1387 self, show_command: str, kdu_model: str, repo_str: str, version: str
1388 ):
1389 """Generates the command to obtain the information about an Helm Chart package
1390 (´helm show ...´ command)
1391
1392 Args:
1393 show_command: the second part of the command (`helm show <show_command>`)
1394 kdu_model: The name or path of an Helm Chart
1395 repo_url: Helm Chart repository url
1396 version: constraint with specific version of the Chart to use
1397
1398 Returns:
1399 str: the generated Helm Chart command
1400 """
1401
1402 @abc.abstractmethod
1403 def _get_get_command(
1404 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1405 ):
1406 """Obtain command to be executed to get information about the kdu instance."""
1407
1408 @abc.abstractmethod
1409 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1410 """
1411 Method call to uninstall cluster software for helm. This method is dependent
1412 of helm version
1413 For Helm v2 it will be called when Tiller must be uninstalled
1414 For Helm v3 it does nothing and does not need to be callled
1415 """
1416
1417 @abc.abstractmethod
1418 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1419 """
1420 Obtains the cluster repos identifiers
1421 """
1422
1423 """
1424 ####################################################################################
1425 ################################### P R I V A T E ##################################
1426 ####################################################################################
1427 """
1428
1429 @staticmethod
1430 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1431 if os.path.exists(filename):
1432 return True
1433 else:
1434 msg = "File {} does not exist".format(filename)
1435 if exception_if_not_exists:
1436 raise K8sException(msg)
1437
1438 @staticmethod
1439 def _remove_multiple_spaces(strobj):
1440 strobj = strobj.strip()
1441 while " " in strobj:
1442 strobj = strobj.replace(" ", " ")
1443 return strobj
1444
1445 @staticmethod
1446 def _output_to_lines(output: str) -> list:
1447 output_lines = list()
1448 lines = output.splitlines(keepends=False)
1449 for line in lines:
1450 line = line.strip()
1451 if len(line) > 0:
1452 output_lines.append(line)
1453 return output_lines
1454
1455 @staticmethod
1456 def _output_to_table(output: str) -> list:
1457 output_table = list()
1458 lines = output.splitlines(keepends=False)
1459 for line in lines:
1460 line = line.replace("\t", " ")
1461 line_list = list()
1462 output_table.append(line_list)
1463 cells = line.split(sep=" ")
1464 for cell in cells:
1465 cell = cell.strip()
1466 if len(cell) > 0:
1467 line_list.append(cell)
1468 return output_table
1469
1470 @staticmethod
1471 def _parse_services(output: str) -> list:
1472 lines = output.splitlines(keepends=False)
1473 services = []
1474 for line in lines:
1475 line = line.replace("\t", " ")
1476 cells = line.split(sep=" ")
1477 if len(cells) > 0 and cells[0].startswith("service/"):
1478 elems = cells[0].split(sep="/")
1479 if len(elems) > 1:
1480 services.append(elems[1])
1481 return services
1482
1483 @staticmethod
1484 def _get_deep(dictionary: dict, members: tuple):
1485 target = dictionary
1486 value = None
1487 try:
1488 for m in members:
1489 value = target.get(m)
1490 if not value:
1491 return None
1492 else:
1493 target = value
1494 except Exception:
1495 pass
1496 return value
1497
1498 # find key:value in several lines
1499 @staticmethod
1500 def _find_in_lines(p_lines: list, p_key: str) -> str:
1501 for line in p_lines:
1502 try:
1503 if line.startswith(p_key + ":"):
1504 parts = line.split(":")
1505 the_value = parts[1].strip()
1506 return the_value
1507 except Exception:
1508 # ignore it
1509 pass
1510 return None
1511
1512 @staticmethod
1513 def _lower_keys_list(input_list: list):
1514 """
1515 Transform the keys in a list of dictionaries to lower case and returns a new list
1516 of dictionaries
1517 """
1518 new_list = []
1519 if input_list:
1520 for dictionary in input_list:
1521 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1522 new_list.append(new_dict)
1523 return new_list
1524
1525 async def _local_async_exec(
1526 self,
1527 command: str,
1528 raise_exception_on_error: bool = False,
1529 show_error_log: bool = True,
1530 encode_utf8: bool = False,
1531 env: dict = None,
1532 ) -> (str, int):
1533 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1534 self.log.debug(
1535 "Executing async local command: {}, env: {}".format(command, env)
1536 )
1537
1538 # split command
1539 command = shlex.split(command)
1540
1541 environ = os.environ.copy()
1542 if env:
1543 environ.update(env)
1544
1545 try:
1546 async with self.cmd_lock:
1547 process = await asyncio.create_subprocess_exec(
1548 *command,
1549 stdout=asyncio.subprocess.PIPE,
1550 stderr=asyncio.subprocess.PIPE,
1551 env=environ,
1552 )
1553
1554 # wait for command terminate
1555 stdout, stderr = await process.communicate()
1556
1557 return_code = process.returncode
1558
1559 output = ""
1560 if stdout:
1561 output = stdout.decode("utf-8").strip()
1562 # output = stdout.decode()
1563 if stderr:
1564 output = stderr.decode("utf-8").strip()
1565 # output = stderr.decode()
1566
1567 if return_code != 0 and show_error_log:
1568 self.log.debug(
1569 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1570 )
1571 else:
1572 self.log.debug("Return code: {}".format(return_code))
1573
1574 if raise_exception_on_error and return_code != 0:
1575 raise K8sException(output)
1576
1577 if encode_utf8:
1578 output = output.encode("utf-8").strip()
1579 output = str(output).replace("\\n", "\n")
1580
1581 return output, return_code
1582
1583 except asyncio.CancelledError:
1584 raise
1585 except K8sException:
1586 raise
1587 except Exception as e:
1588 msg = "Exception executing command: {} -> {}".format(command, e)
1589 self.log.error(msg)
1590 if raise_exception_on_error:
1591 raise K8sException(e) from e
1592 else:
1593 return "", -1
1594
1595 async def _local_async_exec_pipe(
1596 self,
1597 command1: str,
1598 command2: str,
1599 raise_exception_on_error: bool = True,
1600 show_error_log: bool = True,
1601 encode_utf8: bool = False,
1602 env: dict = None,
1603 ):
1604 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1605 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1606 command = "{} | {}".format(command1, command2)
1607 self.log.debug(
1608 "Executing async local command: {}, env: {}".format(command, env)
1609 )
1610
1611 # split command
1612 command1 = shlex.split(command1)
1613 command2 = shlex.split(command2)
1614
1615 environ = os.environ.copy()
1616 if env:
1617 environ.update(env)
1618
1619 try:
1620 async with self.cmd_lock:
1621 read, write = os.pipe()
1622 await asyncio.create_subprocess_exec(
1623 *command1, stdout=write, env=environ
1624 )
1625 os.close(write)
1626 process_2 = await asyncio.create_subprocess_exec(
1627 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1628 )
1629 os.close(read)
1630 stdout, stderr = await process_2.communicate()
1631
1632 return_code = process_2.returncode
1633
1634 output = ""
1635 if stdout:
1636 output = stdout.decode("utf-8").strip()
1637 # output = stdout.decode()
1638 if stderr:
1639 output = stderr.decode("utf-8").strip()
1640 # output = stderr.decode()
1641
1642 if return_code != 0 and show_error_log:
1643 self.log.debug(
1644 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1645 )
1646 else:
1647 self.log.debug("Return code: {}".format(return_code))
1648
1649 if raise_exception_on_error and return_code != 0:
1650 raise K8sException(output)
1651
1652 if encode_utf8:
1653 output = output.encode("utf-8").strip()
1654 output = str(output).replace("\\n", "\n")
1655
1656 return output, return_code
1657 except asyncio.CancelledError:
1658 raise
1659 except K8sException:
1660 raise
1661 except Exception as e:
1662 msg = "Exception executing command: {} -> {}".format(command, e)
1663 self.log.error(msg)
1664 if raise_exception_on_error:
1665 raise K8sException(e) from e
1666 else:
1667 return "", -1
1668
1669 async def _get_service(self, cluster_id, service_name, namespace):
1670 """
1671 Obtains the data of the specified service in the k8cluster.
1672
1673 :param cluster_id: id of a K8s cluster known by OSM
1674 :param service_name: name of the K8s service in the specified namespace
1675 :param namespace: K8s namespace used by the KDU instance
1676 :return: If successful, it will return a service with the following data:
1677 - `name` of the service
1678 - `type` type of service in the k8 cluster
1679 - `ports` List of ports offered by the service, for each port includes at least
1680 name, port, protocol
1681 - `cluster_ip` Internal ip to be used inside k8s cluster
1682 - `external_ip` List of external ips (in case they are available)
1683 """
1684
1685 # init config, env
1686 paths, env = self._init_paths_env(
1687 cluster_name=cluster_id, create_if_not_exist=True
1688 )
1689
1690 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1691 self.kubectl_command, paths["kube_config"], namespace, service_name
1692 )
1693
1694 output, _rc = await self._local_async_exec(
1695 command=command, raise_exception_on_error=True, env=env
1696 )
1697
1698 data = yaml.load(output, Loader=yaml.SafeLoader)
1699
1700 service = {
1701 "name": service_name,
1702 "type": self._get_deep(data, ("spec", "type")),
1703 "ports": self._get_deep(data, ("spec", "ports")),
1704 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1705 }
1706 if service["type"] == "LoadBalancer":
1707 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1708 ip_list = [elem["ip"] for elem in ip_map_list]
1709 service["external_ip"] = ip_list
1710
1711 return service
1712
1713 async def _exec_get_command(
1714 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1715 ):
1716 """Obtains information about the kdu instance."""
1717
1718 full_command = self._get_get_command(
1719 get_command, kdu_instance, namespace, kubeconfig
1720 )
1721
1722 output, _rc = await self._local_async_exec(command=full_command)
1723
1724 return output
1725
1726 async def _exec_inspect_command(
1727 self, inspect_command: str, kdu_model: str, repo_url: str = None
1728 ):
1729 """Obtains information about an Helm Chart package (´helm show´ command)
1730
1731 Args:
1732 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1733 kdu_model: The name or path of an Helm Chart
1734 repo_url: Helm Chart repository url
1735
1736 Returns:
1737 str: the requested info about the Helm Chart package
1738 """
1739
1740 repo_str = ""
1741 if repo_url:
1742 repo_str = " --repo {}".format(repo_url)
1743
1744 # Obtain the Chart's name and store it in the var kdu_model
1745 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1746
1747 kdu_model, version = self._split_version(kdu_model)
1748 if version:
1749 version_str = "--version {}".format(version)
1750 else:
1751 version_str = ""
1752
1753 full_command = self._get_inspect_command(
1754 show_command=inspect_command,
1755 kdu_model=kdu_model,
1756 repo_str=repo_str,
1757 version=version_str,
1758 )
1759
1760 output, _ = await self._local_async_exec(command=full_command)
1761
1762 return output
1763
1764 async def _get_replica_count_url(
1765 self,
1766 kdu_model: str,
1767 repo_url: str = None,
1768 resource_name: str = None,
1769 ) -> (int, str):
1770 """Get the replica count value in the Helm Chart Values.
1771
1772 Args:
1773 kdu_model: The name or path of an Helm Chart
1774 repo_url: Helm Chart repository url
1775 resource_name: Resource name
1776
1777 Returns:
1778 A tuple with:
1779 - The number of replicas of the specific instance; if not found, returns None; and
1780 - The string corresponding to the replica count key in the Helm values
1781 """
1782
1783 kdu_values = yaml.load(
1784 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1785 Loader=yaml.SafeLoader,
1786 )
1787
1788 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1789
1790 if not kdu_values:
1791 raise K8sException(
1792 "kdu_values not found for kdu_model {}".format(kdu_model)
1793 )
1794
1795 if resource_name:
1796 kdu_values = kdu_values.get(resource_name, None)
1797
1798 if not kdu_values:
1799 msg = "resource {} not found in the values in model {}".format(
1800 resource_name, kdu_model
1801 )
1802 self.log.error(msg)
1803 raise K8sException(msg)
1804
1805 duplicate_check = False
1806
1807 replica_str = ""
1808 replicas = None
1809
1810 if kdu_values.get("replicaCount") is not None:
1811 replicas = kdu_values["replicaCount"]
1812 replica_str = "replicaCount"
1813 elif kdu_values.get("replicas") is not None:
1814 duplicate_check = True
1815 replicas = kdu_values["replicas"]
1816 replica_str = "replicas"
1817 else:
1818 if resource_name:
1819 msg = (
1820 "replicaCount or replicas not found in the resource"
1821 "{} values in model {}. Cannot be scaled".format(
1822 resource_name, kdu_model
1823 )
1824 )
1825 else:
1826 msg = (
1827 "replicaCount or replicas not found in the values"
1828 "in model {}. Cannot be scaled".format(kdu_model)
1829 )
1830 self.log.error(msg)
1831 raise K8sException(msg)
1832
1833 # Control if replicas and replicaCount exists at the same time
1834 msg = "replicaCount and replicas are exists at the same time"
1835 if duplicate_check:
1836 if "replicaCount" in kdu_values:
1837 self.log.error(msg)
1838 raise K8sException(msg)
1839 else:
1840 if "replicas" in kdu_values:
1841 self.log.error(msg)
1842 raise K8sException(msg)
1843
1844 return replicas, replica_str
1845
1846 async def _get_replica_count_instance(
1847 self,
1848 kdu_instance: str,
1849 namespace: str,
1850 kubeconfig: str,
1851 resource_name: str = None,
1852 ) -> int:
1853 """Get the replica count value in the instance.
1854
1855 Args:
1856 kdu_instance: The name of the KDU instance
1857 namespace: KDU instance namespace
1858 kubeconfig:
1859 resource_name: Resource name
1860
1861 Returns:
1862 The number of replicas of the specific instance; if not found, returns None
1863 """
1864
1865 kdu_values = yaml.load(
1866 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1867 Loader=yaml.SafeLoader,
1868 )
1869
1870 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1871
1872 replicas = None
1873
1874 if kdu_values:
1875 resource_values = (
1876 kdu_values.get(resource_name, None) if resource_name else None
1877 )
1878
1879 for replica_str in ("replicaCount", "replicas"):
1880 if resource_values:
1881 replicas = resource_values.get(replica_str)
1882 else:
1883 replicas = kdu_values.get(replica_str)
1884
1885 if replicas is not None:
1886 break
1887
1888 return replicas
1889
1890 async def _store_status(
1891 self,
1892 cluster_id: str,
1893 operation: str,
1894 kdu_instance: str,
1895 namespace: str = None,
1896 db_dict: dict = None,
1897 ) -> None:
1898 """
1899 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1900
1901 :param cluster_id (str): the cluster where the KDU instance is deployed
1902 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1903 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1904 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1905 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1906 values for the keys:
1907 - "collection": The Mongo DB collection to write to
1908 - "filter": The query filter to use in the update process
1909 - "path": The dot separated keys which targets the object to be updated
1910 Defaults to None.
1911 """
1912
1913 try:
1914 detailed_status = await self._status_kdu(
1915 cluster_id=cluster_id,
1916 kdu_instance=kdu_instance,
1917 yaml_format=False,
1918 namespace=namespace,
1919 )
1920
1921 status = detailed_status.get("info").get("description")
1922 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1923
1924 # write status to db
1925 result = await self.write_app_status_to_db(
1926 db_dict=db_dict,
1927 status=str(status),
1928 detailed_status=str(detailed_status),
1929 operation=operation,
1930 )
1931
1932 if not result:
1933 self.log.info("Error writing in database. Task exiting...")
1934
1935 except asyncio.CancelledError as e:
1936 self.log.warning(
1937 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1938 )
1939 except Exception as e:
1940 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1941
1942 # params for use in -f file
1943 # returns values file option and filename (in order to delete it at the end)
1944 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1945 if params and len(params) > 0:
1946 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1947
1948 def get_random_number():
1949 r = random.randrange(start=1, stop=99999999)
1950 s = str(r)
1951 while len(s) < 10:
1952 s = "0" + s
1953 return s
1954
1955 params2 = dict()
1956 for key in params:
1957 value = params.get(key)
1958 if "!!yaml" in str(value):
1959 value = yaml.safe_load(value[7:])
1960 params2[key] = value
1961
1962 values_file = get_random_number() + ".yaml"
1963 with open(values_file, "w") as stream:
1964 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1965
1966 return "-f {}".format(values_file), values_file
1967
1968 return "", None
1969
1970 # params for use in --set option
1971 @staticmethod
1972 def _params_to_set_option(params: dict) -> str:
1973 params_str = ""
1974 if params and len(params) > 0:
1975 start = True
1976 for key in params:
1977 value = params.get(key, None)
1978 if value is not None:
1979 if start:
1980 params_str += "--set "
1981 start = False
1982 else:
1983 params_str += ","
1984 params_str += "{}={}".format(key, value)
1985 return params_str
1986
1987 @staticmethod
1988 def generate_kdu_instance_name(**kwargs):
1989 chart_name = kwargs["kdu_model"]
1990 # check embeded chart (file or dir)
1991 if chart_name.startswith("/"):
1992 # extract file or directory name
1993 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1994 # check URL
1995 elif "://" in chart_name:
1996 # extract last portion of URL
1997 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1998
1999 name = ""
2000 for c in chart_name:
2001 if c.isalpha() or c.isnumeric():
2002 name += c
2003 else:
2004 name += "-"
2005 if len(name) > 35:
2006 name = name[0:35]
2007
2008 # if does not start with alpha character, prefix 'a'
2009 if not name[0].isalpha():
2010 name = "a" + name
2011
2012 name += "-"
2013
2014 def get_random_number():
2015 r = random.randrange(start=1, stop=99999999)
2016 s = str(r)
2017 s = s.rjust(10, "0")
2018 return s
2019
2020 name = name + get_random_number()
2021 return name.lower()
2022
2023 def _split_version(self, kdu_model: str) -> (str, str):
2024 version = None
2025 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2026 parts = kdu_model.split(sep=":")
2027 if len(parts) == 2:
2028 version = str(parts[1])
2029 kdu_model = parts[0]
2030 return kdu_model, version
2031
2032 def _split_repo(self, kdu_model: str) -> (str, str):
2033 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2034
2035 Args:
2036 kdu_model (str): Associated KDU model
2037
2038 Returns:
2039 (str, str): Tuple with the Chart name in index 0, and the repo name
2040 in index 2; if there was a problem finding them, return None
2041 for both
2042 """
2043
2044 chart_name = None
2045 repo_name = None
2046
2047 idx = kdu_model.find("/")
2048 if idx >= 0:
2049 chart_name = kdu_model[idx + 1 :]
2050 repo_name = kdu_model[:idx]
2051
2052 return chart_name, repo_name
2053
2054 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2055 """Obtain the Helm repository for an Helm Chart
2056
2057 Args:
2058 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2059 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2060
2061 Returns:
2062 str: the repository URL; if Helm Chart is a local one, the function returns None
2063 """
2064
2065 _, repo_name = self._split_repo(kdu_model=kdu_model)
2066
2067 repo_url = None
2068 if repo_name:
2069 # Find repository link
2070 local_repo_list = await self.repo_list(cluster_uuid)
2071 for repo in local_repo_list:
2072 if repo["name"] == repo_name:
2073 repo_url = repo["url"]
2074 break # it is not necessary to continue the loop if the repo link was found...
2075
2076 return repo_url