65f898cc41465fba0b3b96f7845466841f7b662b
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37 from n2vc.kubectl import Kubectl
38
39
40 class K8sHelmBaseConnector(K8sConnector):
41
42 """
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
46 """
47
48 service_account = "osm"
49
50 def __init__(
51 self,
52 fs: object,
53 db: object,
54 kubectl_command: str = "/usr/bin/kubectl",
55 helm_command: str = "/usr/bin/helm",
56 log: object = None,
57 on_update_db=None,
58 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
74 self.config = EnvironConfig()
75 # random numbers for release name generation
76 random.seed(time.time())
77
78 # the file system
79 self.fs = fs
80
81 # exception if kubectl is not installed
82 self.kubectl_command = kubectl_command
83 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
84
85 # exception if helm is not installed
86 self._helm_command = helm_command
87 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
88
89 # obtain stable repo url from config or apply default
90 self._stable_repo_url = self.config.get("stablerepourl")
91 if self._stable_repo_url == "None":
92 self._stable_repo_url = None
93
94 # Lock to avoid concurrent execution of helm commands
95 self.cmd_lock = asyncio.Lock()
96
97 def _get_namespace(self, cluster_uuid: str) -> str:
98 """
99 Obtains the namespace used by the cluster with the uuid passed by argument
100
101 param: cluster_uuid: cluster's uuid
102 """
103
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster = self.db.get_one(
106 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
107 )
108 return k8scluster.get("namespace")
109
110 async def init_env(
111 self,
112 k8s_creds: str,
113 namespace: str = "kube-system",
114 reuse_cluster_uuid=None,
115 **kwargs,
116 ) -> (str, bool):
117 """
118 It prepares a given K8s cluster environment to run Charts
119
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 '.kube/config'
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
129 """
130
131 if reuse_cluster_uuid:
132 cluster_id = reuse_cluster_uuid
133 else:
134 cluster_id = str(uuid4())
135
136 self.log.debug(
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
138 )
139
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_id, create_if_not_exist=True
142 )
143 mode = stat.S_IRUSR | stat.S_IWUSR
144 with open(paths["kube_config"], "w", mode) as f:
145 f.write(k8s_creds)
146 os.chmod(paths["kube_config"], 0o600)
147
148 # Code with initialization specific of helm version
149 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
150
151 # sync fs with local data
152 self.fs.reverse_sync(from_path=cluster_id)
153
154 self.log.info("Cluster {} initialized".format(cluster_id))
155
156 return cluster_id, n2vc_installed_sw
157
158 async def repo_add(
159 self,
160 cluster_uuid: str,
161 name: str,
162 url: str,
163 repo_type: str = "chart",
164 cert: str = None,
165 user: str = None,
166 password: str = None,
167 ):
168 self.log.debug(
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid, repo_type, name, url
171 )
172 )
173
174 # init_env
175 paths, env = self._init_paths_env(
176 cluster_name=cluster_uuid, create_if_not_exist=True
177 )
178
179 # sync local dir
180 self.fs.sync(from_path=cluster_uuid)
181
182 # helm repo add name url
183 command = ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths["kube_config"], self._helm_command, name, url
185 )
186
187 if cert:
188 temp_cert_file = os.path.join(
189 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
190 )
191 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
192 with open(temp_cert_file, "w") as the_cert:
193 the_cert.write(cert)
194 command += " --ca-file {}".format(temp_cert_file)
195
196 if user:
197 command += " --username={}".format(user)
198
199 if password:
200 command += " --password={}".format(password)
201
202 self.log.debug("adding repo: {}".format(command))
203 await self._local_async_exec(
204 command=command, raise_exception_on_error=True, env=env
205 )
206
207 # helm repo update
208 command = "env KUBECONFIG={} {} repo update {}".format(
209 paths["kube_config"], self._helm_command, name
210 )
211 self.log.debug("updating repo: {}".format(command))
212 await self._local_async_exec(
213 command=command, raise_exception_on_error=False, env=env
214 )
215
216 # sync fs
217 self.fs.reverse_sync(from_path=cluster_uuid)
218
219 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
220 self.log.debug(
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid, repo_type, name
223 )
224 )
225
226 # init_env
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 # helm repo update
235 command = "{} repo update {}".format(self._helm_command, name)
236 self.log.debug("updating repo: {}".format(command))
237 await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_uuid)
243
244 async def repo_list(self, cluster_uuid: str) -> list:
245 """
246 Get the list of registered repositories
247
248 :return: list of registered repositories: [ (name, url) .... ]
249 """
250
251 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
252
253 # config filename
254 paths, env = self._init_paths_env(
255 cluster_name=cluster_uuid, create_if_not_exist=True
256 )
257
258 # sync local dir
259 self.fs.sync(from_path=cluster_uuid)
260
261 command = "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths["kube_config"], self._helm_command
263 )
264
265 # Set exception to false because if there are no repos just want an empty list
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=False, env=env
268 )
269
270 # sync fs
271 self.fs.reverse_sync(from_path=cluster_uuid)
272
273 if _rc == 0:
274 if output and len(output) > 0:
275 repos = yaml.load(output, Loader=yaml.SafeLoader)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self._lower_keys_list(repos)
278 else:
279 return []
280 else:
281 return []
282
283 async def repo_remove(self, cluster_uuid: str, name: str):
284 self.log.debug(
285 "remove {} repositories for cluster {}".format(name, cluster_uuid)
286 )
287
288 # init env, paths
289 paths, env = self._init_paths_env(
290 cluster_name=cluster_uuid, create_if_not_exist=True
291 )
292
293 # sync local dir
294 self.fs.sync(from_path=cluster_uuid)
295
296 command = "env KUBECONFIG={} {} repo remove {}".format(
297 paths["kube_config"], self._helm_command, name
298 )
299 await self._local_async_exec(
300 command=command, raise_exception_on_error=True, env=env
301 )
302
303 # sync fs
304 self.fs.reverse_sync(from_path=cluster_uuid)
305
306 async def reset(
307 self,
308 cluster_uuid: str,
309 force: bool = False,
310 uninstall_sw: bool = False,
311 **kwargs,
312 ) -> bool:
313 """Reset a cluster
314
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
322 """
323 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
324 self.log.debug(
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid, uninstall_sw
327 )
328 )
329
330 # sync local dir
331 self.fs.sync(from_path=cluster_uuid)
332
333 # uninstall releases if needed.
334 if uninstall_sw:
335 releases = await self.instances_list(cluster_uuid=cluster_uuid)
336 if len(releases) > 0:
337 if force:
338 for r in releases:
339 try:
340 kdu_instance = r.get("name")
341 chart = r.get("chart")
342 self.log.debug(
343 "Uninstalling {} -> {}".format(chart, kdu_instance)
344 )
345 await self.uninstall(
346 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
347 )
348 except Exception as e:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
351 # raised an error
352 self.log.warn(
353 "Error uninstalling release {}: {}".format(
354 kdu_instance, e
355 )
356 )
357 else:
358 msg = (
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid)
361 self.log.warn(msg)
362 uninstall_sw = (
363 False # Allow to remove k8s cluster without removing Tiller
364 )
365
366 if uninstall_sw:
367 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
368
369 # delete cluster directory
370 self.log.debug("Removing directory {}".format(cluster_uuid))
371 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
372 # Remove also local directorio if still exist
373 direct = self.fs.path + "/" + cluster_uuid
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 def _is_helm_chart_a_file(self, chart_name: str):
379 return chart_name.count("/") > 1
380
381 async def _install_impl(
382 self,
383 cluster_id: str,
384 kdu_model: str,
385 paths: dict,
386 env: dict,
387 kdu_instance: str,
388 atomic: bool = True,
389 timeout: float = 300,
390 params: dict = None,
391 db_dict: dict = None,
392 kdu_name: str = None,
393 namespace: str = None,
394 ):
395 # init env, paths
396 paths, env = self._init_paths_env(
397 cluster_name=cluster_id, create_if_not_exist=True
398 )
399
400 # params to str
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 # version
406 kdu_model, version = self._split_version(kdu_model)
407
408 _, repo = self._split_repo(kdu_model)
409 if repo:
410 await self.repo_update(cluster_id, repo)
411
412 command = self._get_install_command(
413 kdu_model,
414 kdu_instance,
415 namespace,
416 params_str,
417 version,
418 atomic,
419 timeout,
420 paths["kube_config"],
421 )
422
423 self.log.debug("installing: {}".format(command))
424
425 if atomic:
426 # exec helm in a task
427 exec_task = asyncio.ensure_future(
428 coro_or_future=self._local_async_exec(
429 command=command, raise_exception_on_error=False, env=env
430 )
431 )
432
433 # write status in another task
434 status_task = asyncio.ensure_future(
435 coro_or_future=self._store_status(
436 cluster_id=cluster_id,
437 kdu_instance=kdu_instance,
438 namespace=namespace,
439 db_dict=db_dict,
440 operation="install",
441 )
442 )
443
444 # wait for execution task
445 await asyncio.wait([exec_task])
446
447 # cancel status task
448 status_task.cancel()
449
450 output, rc = exec_task.result()
451
452 else:
453
454 output, rc = await self._local_async_exec(
455 command=command, raise_exception_on_error=False, env=env
456 )
457
458 # remove temporal values yaml file
459 if file_to_delete:
460 os.remove(file_to_delete)
461
462 # write final status
463 await self._store_status(
464 cluster_id=cluster_id,
465 kdu_instance=kdu_instance,
466 namespace=namespace,
467 db_dict=db_dict,
468 operation="install",
469 )
470
471 if rc != 0:
472 msg = "Error executing command: {}\nOutput: {}".format(command, output)
473 self.log.error(msg)
474 raise K8sException(msg)
475
476 async def upgrade(
477 self,
478 cluster_uuid: str,
479 kdu_instance: str,
480 kdu_model: str = None,
481 atomic: bool = True,
482 timeout: float = 300,
483 params: dict = None,
484 db_dict: dict = None,
485 namespace: str = None,
486 force: bool = False,
487 ):
488 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
489
490 # sync local dir
491 self.fs.sync(from_path=cluster_uuid)
492
493 # look for instance to obtain namespace
494
495 # set namespace
496 if not namespace:
497 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
498 if not instance_info:
499 raise K8sException("kdu_instance {} not found".format(kdu_instance))
500 namespace = instance_info["namespace"]
501
502 # init env, paths
503 paths, env = self._init_paths_env(
504 cluster_name=cluster_uuid, create_if_not_exist=True
505 )
506
507 # sync local dir
508 self.fs.sync(from_path=cluster_uuid)
509
510 # params to str
511 params_str, file_to_delete = self._params_to_file_option(
512 cluster_id=cluster_uuid, params=params
513 )
514
515 # version
516 kdu_model, version = self._split_version(kdu_model)
517
518 _, repo = self._split_repo(kdu_model)
519 if repo:
520 await self.repo_update(cluster_uuid, repo)
521
522 command = self._get_upgrade_command(
523 kdu_model,
524 kdu_instance,
525 namespace,
526 params_str,
527 version,
528 atomic,
529 timeout,
530 paths["kube_config"],
531 force,
532 )
533
534 self.log.debug("upgrading: {}".format(command))
535
536 if atomic:
537
538 # exec helm in a task
539 exec_task = asyncio.ensure_future(
540 coro_or_future=self._local_async_exec(
541 command=command, raise_exception_on_error=False, env=env
542 )
543 )
544 # write status in another task
545 status_task = asyncio.ensure_future(
546 coro_or_future=self._store_status(
547 cluster_id=cluster_uuid,
548 kdu_instance=kdu_instance,
549 namespace=namespace,
550 db_dict=db_dict,
551 operation="upgrade",
552 )
553 )
554
555 # wait for execution task
556 await asyncio.wait([exec_task])
557
558 # cancel status task
559 status_task.cancel()
560 output, rc = exec_task.result()
561
562 else:
563
564 output, rc = await self._local_async_exec(
565 command=command, raise_exception_on_error=False, env=env
566 )
567
568 # remove temporal values yaml file
569 if file_to_delete:
570 os.remove(file_to_delete)
571
572 # write final status
573 await self._store_status(
574 cluster_id=cluster_uuid,
575 kdu_instance=kdu_instance,
576 namespace=namespace,
577 db_dict=db_dict,
578 operation="upgrade",
579 )
580
581 if rc != 0:
582 msg = "Error executing command: {}\nOutput: {}".format(command, output)
583 self.log.error(msg)
584 raise K8sException(msg)
585
586 # sync fs
587 self.fs.reverse_sync(from_path=cluster_uuid)
588
589 # return new revision number
590 instance = await self.get_instance_info(
591 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
592 )
593 if instance:
594 revision = int(instance.get("revision"))
595 self.log.debug("New revision: {}".format(revision))
596 return revision
597 else:
598 return 0
599
600 async def scale(
601 self,
602 kdu_instance: str,
603 scale: int,
604 resource_name: str,
605 total_timeout: float = 1800,
606 cluster_uuid: str = None,
607 kdu_model: str = None,
608 atomic: bool = True,
609 db_dict: dict = None,
610 **kwargs,
611 ):
612 """Scale a resource in a Helm Chart.
613
614 Args:
615 kdu_instance: KDU instance name
616 scale: Scale to which to set the resource
617 resource_name: Resource name
618 total_timeout: The time, in seconds, to wait
619 cluster_uuid: The UUID of the cluster
620 kdu_model: The chart reference
621 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
622 The --wait flag will be set automatically if --atomic is used
623 db_dict: Dictionary for any additional data
624 kwargs: Additional parameters
625
626 Returns:
627 True if successful, False otherwise
628 """
629
630 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
631 if resource_name:
632 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
633 resource_name, kdu_model, cluster_uuid
634 )
635
636 self.log.debug(debug_mgs)
637
638 # look for instance to obtain namespace
639 # get_instance_info function calls the sync command
640 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
641 if not instance_info:
642 raise K8sException("kdu_instance {} not found".format(kdu_instance))
643
644 # init env, paths
645 paths, env = self._init_paths_env(
646 cluster_name=cluster_uuid, create_if_not_exist=True
647 )
648
649 # version
650 kdu_model, version = self._split_version(kdu_model)
651
652 repo_url = await self._find_repo(kdu_model, cluster_uuid)
653
654 _, replica_str = await self._get_replica_count_url(
655 kdu_model, repo_url, resource_name
656 )
657
658 command = self._get_upgrade_scale_command(
659 kdu_model,
660 kdu_instance,
661 instance_info["namespace"],
662 scale,
663 version,
664 atomic,
665 replica_str,
666 total_timeout,
667 resource_name,
668 paths["kube_config"],
669 )
670
671 self.log.debug("scaling: {}".format(command))
672
673 if atomic:
674 # exec helm in a task
675 exec_task = asyncio.ensure_future(
676 coro_or_future=self._local_async_exec(
677 command=command, raise_exception_on_error=False, env=env
678 )
679 )
680 # write status in another task
681 status_task = asyncio.ensure_future(
682 coro_or_future=self._store_status(
683 cluster_id=cluster_uuid,
684 kdu_instance=kdu_instance,
685 namespace=instance_info["namespace"],
686 db_dict=db_dict,
687 operation="scale",
688 )
689 )
690
691 # wait for execution task
692 await asyncio.wait([exec_task])
693
694 # cancel status task
695 status_task.cancel()
696 output, rc = exec_task.result()
697
698 else:
699 output, rc = await self._local_async_exec(
700 command=command, raise_exception_on_error=False, env=env
701 )
702
703 # write final status
704 await self._store_status(
705 cluster_id=cluster_uuid,
706 kdu_instance=kdu_instance,
707 namespace=instance_info["namespace"],
708 db_dict=db_dict,
709 operation="scale",
710 )
711
712 if rc != 0:
713 msg = "Error executing command: {}\nOutput: {}".format(command, output)
714 self.log.error(msg)
715 raise K8sException(msg)
716
717 # sync fs
718 self.fs.reverse_sync(from_path=cluster_uuid)
719
720 return True
721
722 async def get_scale_count(
723 self,
724 resource_name: str,
725 kdu_instance: str,
726 cluster_uuid: str,
727 kdu_model: str,
728 **kwargs,
729 ) -> int:
730 """Get a resource scale count.
731
732 Args:
733 cluster_uuid: The UUID of the cluster
734 resource_name: Resource name
735 kdu_instance: KDU instance name
736 kdu_model: The name or path of an Helm Chart
737 kwargs: Additional parameters
738
739 Returns:
740 Resource instance count
741 """
742
743 self.log.debug(
744 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
745 )
746
747 # look for instance to obtain namespace
748 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
749 if not instance_info:
750 raise K8sException("kdu_instance {} not found".format(kdu_instance))
751
752 # init env, paths
753 paths, _ = self._init_paths_env(
754 cluster_name=cluster_uuid, create_if_not_exist=True
755 )
756
757 replicas = await self._get_replica_count_instance(
758 kdu_instance=kdu_instance,
759 namespace=instance_info["namespace"],
760 kubeconfig=paths["kube_config"],
761 resource_name=resource_name,
762 )
763
764 self.log.debug(
765 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
766 )
767
768 # Get default value if scale count is not found from provided values
769 # Important note: this piece of code shall only be executed in the first scaling operation,
770 # since it is expected that the _get_replica_count_instance is able to obtain the number of
771 # replicas when a scale operation was already conducted previously for this KDU/resource!
772 if replicas is None:
773 repo_url = await self._find_repo(
774 kdu_model=kdu_model, cluster_uuid=cluster_uuid
775 )
776 replicas, _ = await self._get_replica_count_url(
777 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
778 )
779
780 self.log.debug(
781 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
782 f"{resource_name} obtained: {replicas}"
783 )
784
785 if replicas is None:
786 msg = "Replica count not found. Cannot be scaled"
787 self.log.error(msg)
788 raise K8sException(msg)
789
790 return int(replicas)
791
792 async def rollback(
793 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
794 ):
795 self.log.debug(
796 "rollback kdu_instance {} to revision {} from cluster {}".format(
797 kdu_instance, revision, cluster_uuid
798 )
799 )
800
801 # sync local dir
802 self.fs.sync(from_path=cluster_uuid)
803
804 # look for instance to obtain namespace
805 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
806 if not instance_info:
807 raise K8sException("kdu_instance {} not found".format(kdu_instance))
808
809 # init env, paths
810 paths, env = self._init_paths_env(
811 cluster_name=cluster_uuid, create_if_not_exist=True
812 )
813
814 # sync local dir
815 self.fs.sync(from_path=cluster_uuid)
816
817 command = self._get_rollback_command(
818 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
819 )
820
821 self.log.debug("rolling_back: {}".format(command))
822
823 # exec helm in a task
824 exec_task = asyncio.ensure_future(
825 coro_or_future=self._local_async_exec(
826 command=command, raise_exception_on_error=False, env=env
827 )
828 )
829 # write status in another task
830 status_task = asyncio.ensure_future(
831 coro_or_future=self._store_status(
832 cluster_id=cluster_uuid,
833 kdu_instance=kdu_instance,
834 namespace=instance_info["namespace"],
835 db_dict=db_dict,
836 operation="rollback",
837 )
838 )
839
840 # wait for execution task
841 await asyncio.wait([exec_task])
842
843 # cancel status task
844 status_task.cancel()
845
846 output, rc = exec_task.result()
847
848 # write final status
849 await self._store_status(
850 cluster_id=cluster_uuid,
851 kdu_instance=kdu_instance,
852 namespace=instance_info["namespace"],
853 db_dict=db_dict,
854 operation="rollback",
855 )
856
857 if rc != 0:
858 msg = "Error executing command: {}\nOutput: {}".format(command, output)
859 self.log.error(msg)
860 raise K8sException(msg)
861
862 # sync fs
863 self.fs.reverse_sync(from_path=cluster_uuid)
864
865 # return new revision number
866 instance = await self.get_instance_info(
867 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
868 )
869 if instance:
870 revision = int(instance.get("revision"))
871 self.log.debug("New revision: {}".format(revision))
872 return revision
873 else:
874 return 0
875
876 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
877 """
878 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
879 (this call should happen after all _terminate-config-primitive_ of the VNF
880 are invoked).
881
882 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
883 :param kdu_instance: unique name for the KDU instance to be deleted
884 :param kwargs: Additional parameters (None yet)
885 :return: True if successful
886 """
887
888 self.log.debug(
889 "uninstall kdu_instance {} from cluster {}".format(
890 kdu_instance, cluster_uuid
891 )
892 )
893
894 # sync local dir
895 self.fs.sync(from_path=cluster_uuid)
896
897 # look for instance to obtain namespace
898 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
899 if not instance_info:
900 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
901 return True
902 # init env, paths
903 paths, env = self._init_paths_env(
904 cluster_name=cluster_uuid, create_if_not_exist=True
905 )
906
907 # sync local dir
908 self.fs.sync(from_path=cluster_uuid)
909
910 command = self._get_uninstall_command(
911 kdu_instance, instance_info["namespace"], paths["kube_config"]
912 )
913 output, _rc = await self._local_async_exec(
914 command=command, raise_exception_on_error=True, env=env
915 )
916
917 # sync fs
918 self.fs.reverse_sync(from_path=cluster_uuid)
919
920 return self._output_to_table(output)
921
922 async def instances_list(self, cluster_uuid: str) -> list:
923 """
924 returns a list of deployed releases in a cluster
925
926 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
927 :return:
928 """
929
930 self.log.debug("list releases for cluster {}".format(cluster_uuid))
931
932 # sync local dir
933 self.fs.sync(from_path=cluster_uuid)
934
935 # execute internal command
936 result = await self._instances_list(cluster_uuid)
937
938 # sync fs
939 self.fs.reverse_sync(from_path=cluster_uuid)
940
941 return result
942
943 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
944 instances = await self.instances_list(cluster_uuid=cluster_uuid)
945 for instance in instances:
946 if instance.get("name") == kdu_instance:
947 return instance
948 self.log.debug("Instance {} not found".format(kdu_instance))
949 return None
950
951 async def upgrade_charm(
952 self,
953 ee_id: str = None,
954 path: str = None,
955 charm_id: str = None,
956 charm_type: str = None,
957 timeout: float = None,
958 ) -> str:
959 """This method upgrade charms in VNFs
960
961 Args:
962 ee_id: Execution environment id
963 path: Local path to the charm
964 charm_id: charm-id
965 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
966 timeout: (Float) Timeout for the ns update operation
967
968 Returns:
969 The output of the update operation if status equals to "completed"
970 """
971 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
972
973 async def exec_primitive(
974 self,
975 cluster_uuid: str = None,
976 kdu_instance: str = None,
977 primitive_name: str = None,
978 timeout: float = 300,
979 params: dict = None,
980 db_dict: dict = None,
981 **kwargs,
982 ) -> str:
983 """Exec primitive (Juju action)
984
985 :param cluster_uuid: The UUID of the cluster or namespace:cluster
986 :param kdu_instance: The unique name of the KDU instance
987 :param primitive_name: Name of action that will be executed
988 :param timeout: Timeout for action execution
989 :param params: Dictionary of all the parameters needed for the action
990 :db_dict: Dictionary for any additional data
991 :param kwargs: Additional parameters (None yet)
992
993 :return: Returns the output of the action
994 """
995 raise K8sException(
996 "KDUs deployed with Helm don't support actions "
997 "different from rollback, upgrade and status"
998 )
999
1000 async def get_services(
1001 self, cluster_uuid: str, kdu_instance: str, namespace: str
1002 ) -> list:
1003 """
1004 Returns a list of services defined for the specified kdu instance.
1005
1006 :param cluster_uuid: UUID of a K8s cluster known by OSM
1007 :param kdu_instance: unique name for the KDU instance
1008 :param namespace: K8s namespace used by the KDU instance
1009 :return: If successful, it will return a list of services, Each service
1010 can have the following data:
1011 - `name` of the service
1012 - `type` type of service in the k8 cluster
1013 - `ports` List of ports offered by the service, for each port includes at least
1014 name, port, protocol
1015 - `cluster_ip` Internal ip to be used inside k8s cluster
1016 - `external_ip` List of external ips (in case they are available)
1017 """
1018
1019 self.log.debug(
1020 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1021 cluster_uuid, kdu_instance
1022 )
1023 )
1024
1025 # init env, paths
1026 paths, env = self._init_paths_env(
1027 cluster_name=cluster_uuid, create_if_not_exist=True
1028 )
1029
1030 # sync local dir
1031 self.fs.sync(from_path=cluster_uuid)
1032
1033 # get list of services names for kdu
1034 service_names = await self._get_services(
1035 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1036 )
1037
1038 service_list = []
1039 for service in service_names:
1040 service = await self._get_service(cluster_uuid, service, namespace)
1041 service_list.append(service)
1042
1043 # sync fs
1044 self.fs.reverse_sync(from_path=cluster_uuid)
1045
1046 return service_list
1047
1048 async def get_service(
1049 self, cluster_uuid: str, service_name: str, namespace: str
1050 ) -> object:
1051
1052 self.log.debug(
1053 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1054 service_name, namespace, cluster_uuid
1055 )
1056 )
1057
1058 # sync local dir
1059 self.fs.sync(from_path=cluster_uuid)
1060
1061 service = await self._get_service(cluster_uuid, service_name, namespace)
1062
1063 # sync fs
1064 self.fs.reverse_sync(from_path=cluster_uuid)
1065
1066 return service
1067
1068 async def status_kdu(
1069 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1070 ) -> Union[str, dict]:
1071 """
1072 This call would retrieve tha current state of a given KDU instance. It would be
1073 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1074 values_ of the configuration parameters applied to a given instance. This call
1075 would be based on the `status` call.
1076
1077 :param cluster_uuid: UUID of a K8s cluster known by OSM
1078 :param kdu_instance: unique name for the KDU instance
1079 :param kwargs: Additional parameters (None yet)
1080 :param yaml_format: if the return shall be returned as an YAML string or as a
1081 dictionary
1082 :return: If successful, it will return the following vector of arguments:
1083 - K8s `namespace` in the cluster where the KDU lives
1084 - `state` of the KDU instance. It can be:
1085 - UNKNOWN
1086 - DEPLOYED
1087 - DELETED
1088 - SUPERSEDED
1089 - FAILED or
1090 - DELETING
1091 - List of `resources` (objects) that this release consists of, sorted by kind,
1092 and the status of those resources
1093 - Last `deployment_time`.
1094
1095 """
1096 self.log.debug(
1097 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1098 cluster_uuid, kdu_instance
1099 )
1100 )
1101
1102 # sync local dir
1103 self.fs.sync(from_path=cluster_uuid)
1104
1105 # get instance: needed to obtain namespace
1106 instances = await self._instances_list(cluster_id=cluster_uuid)
1107 for instance in instances:
1108 if instance.get("name") == kdu_instance:
1109 break
1110 else:
1111 # instance does not exist
1112 raise K8sException(
1113 "Instance name: {} not found in cluster: {}".format(
1114 kdu_instance, cluster_uuid
1115 )
1116 )
1117
1118 status = await self._status_kdu(
1119 cluster_id=cluster_uuid,
1120 kdu_instance=kdu_instance,
1121 namespace=instance["namespace"],
1122 yaml_format=yaml_format,
1123 show_error_log=True,
1124 )
1125
1126 # sync fs
1127 self.fs.reverse_sync(from_path=cluster_uuid)
1128
1129 return status
1130
1131 async def get_values_kdu(
1132 self, kdu_instance: str, namespace: str, kubeconfig: str
1133 ) -> str:
1134
1135 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1136
1137 return await self._exec_get_command(
1138 get_command="values",
1139 kdu_instance=kdu_instance,
1140 namespace=namespace,
1141 kubeconfig=kubeconfig,
1142 )
1143
1144 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1145 """Method to obtain the Helm Chart package's values
1146
1147 Args:
1148 kdu_model: The name or path of an Helm Chart
1149 repo_url: Helm Chart repository url
1150
1151 Returns:
1152 str: the values of the Helm Chart package
1153 """
1154
1155 self.log.debug(
1156 "inspect kdu_model values {} from (optional) repo: {}".format(
1157 kdu_model, repo_url
1158 )
1159 )
1160
1161 return await self._exec_inspect_command(
1162 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1163 )
1164
1165 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1166
1167 self.log.debug(
1168 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1169 )
1170
1171 return await self._exec_inspect_command(
1172 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1173 )
1174
1175 async def synchronize_repos(self, cluster_uuid: str):
1176
1177 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1178 try:
1179 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1180 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1181
1182 local_repo_list = await self.repo_list(cluster_uuid)
1183 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1184
1185 deleted_repo_list = []
1186 added_repo_dict = {}
1187
1188 # iterate over the list of repos in the database that should be
1189 # added if not present
1190 for repo_name, db_repo in db_repo_dict.items():
1191 try:
1192 # check if it is already present
1193 curr_repo_url = local_repo_dict.get(db_repo["name"])
1194 repo_id = db_repo.get("_id")
1195 if curr_repo_url != db_repo["url"]:
1196 if curr_repo_url:
1197 self.log.debug(
1198 "repo {} url changed, delete and and again".format(
1199 db_repo["url"]
1200 )
1201 )
1202 await self.repo_remove(cluster_uuid, db_repo["name"])
1203 deleted_repo_list.append(repo_id)
1204
1205 # add repo
1206 self.log.debug("add repo {}".format(db_repo["name"]))
1207 if "ca_cert" in db_repo:
1208 await self.repo_add(
1209 cluster_uuid,
1210 db_repo["name"],
1211 db_repo["url"],
1212 cert=db_repo["ca_cert"],
1213 )
1214 else:
1215 await self.repo_add(
1216 cluster_uuid,
1217 db_repo["name"],
1218 db_repo["url"],
1219 )
1220 added_repo_dict[repo_id] = db_repo["name"]
1221 except Exception as e:
1222 raise K8sException(
1223 "Error adding repo id: {}, err_msg: {} ".format(
1224 repo_id, repr(e)
1225 )
1226 )
1227
1228 # Delete repos that are present but not in nbi_list
1229 for repo_name in local_repo_dict:
1230 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1231 self.log.debug("delete repo {}".format(repo_name))
1232 try:
1233 await self.repo_remove(cluster_uuid, repo_name)
1234 deleted_repo_list.append(repo_name)
1235 except Exception as e:
1236 self.warning(
1237 "Error deleting repo, name: {}, err_msg: {}".format(
1238 repo_name, str(e)
1239 )
1240 )
1241
1242 return deleted_repo_list, added_repo_dict
1243
1244 except K8sException:
1245 raise
1246 except Exception as e:
1247 # Do not raise errors synchronizing repos
1248 self.log.error("Error synchronizing repos: {}".format(e))
1249 raise Exception("Error synchronizing repos: {}".format(e))
1250
1251 def _get_db_repos_dict(self, repo_ids: list):
1252 db_repos_dict = {}
1253 for repo_id in repo_ids:
1254 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1255 db_repos_dict[db_repo["name"]] = db_repo
1256 return db_repos_dict
1257
1258 """
1259 ####################################################################################
1260 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1261 ####################################################################################
1262 """
1263
1264 @abc.abstractmethod
1265 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1266 """
1267 Creates and returns base cluster and kube dirs and returns them.
1268 Also created helm3 dirs according to new directory specification, paths are
1269 not returned but assigned to helm environment variables
1270
1271 :param cluster_name: cluster_name
1272 :return: Dictionary with config_paths and dictionary with helm environment variables
1273 """
1274
1275 @abc.abstractmethod
1276 async def _cluster_init(self, cluster_id, namespace, paths, env):
1277 """
1278 Implements the helm version dependent cluster initialization
1279 """
1280
1281 @abc.abstractmethod
1282 async def _instances_list(self, cluster_id):
1283 """
1284 Implements the helm version dependent helm instances list
1285 """
1286
1287 @abc.abstractmethod
1288 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1289 """
1290 Implements the helm version dependent method to obtain services from a helm instance
1291 """
1292
1293 @abc.abstractmethod
1294 async def _status_kdu(
1295 self,
1296 cluster_id: str,
1297 kdu_instance: str,
1298 namespace: str = None,
1299 yaml_format: bool = False,
1300 show_error_log: bool = False,
1301 ) -> Union[str, dict]:
1302 """
1303 Implements the helm version dependent method to obtain status of a helm instance
1304 """
1305
1306 @abc.abstractmethod
1307 def _get_install_command(
1308 self,
1309 kdu_model,
1310 kdu_instance,
1311 namespace,
1312 params_str,
1313 version,
1314 atomic,
1315 timeout,
1316 kubeconfig,
1317 ) -> str:
1318 """
1319 Obtain command to be executed to delete the indicated instance
1320 """
1321
1322 @abc.abstractmethod
1323 def _get_upgrade_scale_command(
1324 self,
1325 kdu_model,
1326 kdu_instance,
1327 namespace,
1328 count,
1329 version,
1330 atomic,
1331 replicas,
1332 timeout,
1333 resource_name,
1334 kubeconfig,
1335 ) -> str:
1336 """Generates the command to scale a Helm Chart release
1337
1338 Args:
1339 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1340 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1341 namespace (str): Namespace where this KDU instance is deployed
1342 scale (int): Scale count
1343 version (str): Constraint with specific version of the Chart to use
1344 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1345 The --wait flag will be set automatically if --atomic is used
1346 replica_str (str): The key under resource_name key where the scale count is stored
1347 timeout (float): The time, in seconds, to wait
1348 resource_name (str): The KDU's resource to scale
1349 kubeconfig (str): Kubeconfig file path
1350
1351 Returns:
1352 str: command to scale a Helm Chart release
1353 """
1354
1355 @abc.abstractmethod
1356 def _get_upgrade_command(
1357 self,
1358 kdu_model,
1359 kdu_instance,
1360 namespace,
1361 params_str,
1362 version,
1363 atomic,
1364 timeout,
1365 kubeconfig,
1366 force,
1367 ) -> str:
1368 """Generates the command to upgrade a Helm Chart release
1369
1370 Args:
1371 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1372 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1373 namespace (str): Namespace where this KDU instance is deployed
1374 params_str (str): Params used to upgrade the Helm Chart release
1375 version (str): Constraint with specific version of the Chart to use
1376 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1377 The --wait flag will be set automatically if --atomic is used
1378 timeout (float): The time, in seconds, to wait
1379 kubeconfig (str): Kubeconfig file path
1380 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1381 Returns:
1382 str: command to upgrade a Helm Chart release
1383 """
1384
1385 @abc.abstractmethod
1386 def _get_rollback_command(
1387 self, kdu_instance, namespace, revision, kubeconfig
1388 ) -> str:
1389 """
1390 Obtain command to be executed to rollback the indicated instance
1391 """
1392
1393 @abc.abstractmethod
1394 def _get_uninstall_command(
1395 self, kdu_instance: str, namespace: str, kubeconfig: str
1396 ) -> str:
1397 """
1398 Obtain command to be executed to delete the indicated instance
1399 """
1400
1401 @abc.abstractmethod
1402 def _get_inspect_command(
1403 self, show_command: str, kdu_model: str, repo_str: str, version: str
1404 ):
1405 """Generates the command to obtain the information about an Helm Chart package
1406 (´helm show ...´ command)
1407
1408 Args:
1409 show_command: the second part of the command (`helm show <show_command>`)
1410 kdu_model: The name or path of an Helm Chart
1411 repo_url: Helm Chart repository url
1412 version: constraint with specific version of the Chart to use
1413
1414 Returns:
1415 str: the generated Helm Chart command
1416 """
1417
1418 @abc.abstractmethod
1419 def _get_get_command(
1420 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1421 ):
1422 """Obtain command to be executed to get information about the kdu instance."""
1423
1424 @abc.abstractmethod
1425 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1426 """
1427 Method call to uninstall cluster software for helm. This method is dependent
1428 of helm version
1429 For Helm v2 it will be called when Tiller must be uninstalled
1430 For Helm v3 it does nothing and does not need to be callled
1431 """
1432
1433 @abc.abstractmethod
1434 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1435 """
1436 Obtains the cluster repos identifiers
1437 """
1438
1439 """
1440 ####################################################################################
1441 ################################### P R I V A T E ##################################
1442 ####################################################################################
1443 """
1444
1445 @staticmethod
1446 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1447 if os.path.exists(filename):
1448 return True
1449 else:
1450 msg = "File {} does not exist".format(filename)
1451 if exception_if_not_exists:
1452 raise K8sException(msg)
1453
1454 @staticmethod
1455 def _remove_multiple_spaces(strobj):
1456 strobj = strobj.strip()
1457 while " " in strobj:
1458 strobj = strobj.replace(" ", " ")
1459 return strobj
1460
1461 @staticmethod
1462 def _output_to_lines(output: str) -> list:
1463 output_lines = list()
1464 lines = output.splitlines(keepends=False)
1465 for line in lines:
1466 line = line.strip()
1467 if len(line) > 0:
1468 output_lines.append(line)
1469 return output_lines
1470
1471 @staticmethod
1472 def _output_to_table(output: str) -> list:
1473 output_table = list()
1474 lines = output.splitlines(keepends=False)
1475 for line in lines:
1476 line = line.replace("\t", " ")
1477 line_list = list()
1478 output_table.append(line_list)
1479 cells = line.split(sep=" ")
1480 for cell in cells:
1481 cell = cell.strip()
1482 if len(cell) > 0:
1483 line_list.append(cell)
1484 return output_table
1485
1486 @staticmethod
1487 def _parse_services(output: str) -> list:
1488 lines = output.splitlines(keepends=False)
1489 services = []
1490 for line in lines:
1491 line = line.replace("\t", " ")
1492 cells = line.split(sep=" ")
1493 if len(cells) > 0 and cells[0].startswith("service/"):
1494 elems = cells[0].split(sep="/")
1495 if len(elems) > 1:
1496 services.append(elems[1])
1497 return services
1498
1499 @staticmethod
1500 def _get_deep(dictionary: dict, members: tuple):
1501 target = dictionary
1502 value = None
1503 try:
1504 for m in members:
1505 value = target.get(m)
1506 if not value:
1507 return None
1508 else:
1509 target = value
1510 except Exception:
1511 pass
1512 return value
1513
1514 # find key:value in several lines
1515 @staticmethod
1516 def _find_in_lines(p_lines: list, p_key: str) -> str:
1517 for line in p_lines:
1518 try:
1519 if line.startswith(p_key + ":"):
1520 parts = line.split(":")
1521 the_value = parts[1].strip()
1522 return the_value
1523 except Exception:
1524 # ignore it
1525 pass
1526 return None
1527
1528 @staticmethod
1529 def _lower_keys_list(input_list: list):
1530 """
1531 Transform the keys in a list of dictionaries to lower case and returns a new list
1532 of dictionaries
1533 """
1534 new_list = []
1535 if input_list:
1536 for dictionary in input_list:
1537 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1538 new_list.append(new_dict)
1539 return new_list
1540
1541 async def _local_async_exec(
1542 self,
1543 command: str,
1544 raise_exception_on_error: bool = False,
1545 show_error_log: bool = True,
1546 encode_utf8: bool = False,
1547 env: dict = None,
1548 ) -> (str, int):
1549
1550 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1551 self.log.debug(
1552 "Executing async local command: {}, env: {}".format(command, env)
1553 )
1554
1555 # split command
1556 command = shlex.split(command)
1557
1558 environ = os.environ.copy()
1559 if env:
1560 environ.update(env)
1561
1562 try:
1563 async with self.cmd_lock:
1564 process = await asyncio.create_subprocess_exec(
1565 *command,
1566 stdout=asyncio.subprocess.PIPE,
1567 stderr=asyncio.subprocess.PIPE,
1568 env=environ,
1569 )
1570
1571 # wait for command terminate
1572 stdout, stderr = await process.communicate()
1573
1574 return_code = process.returncode
1575
1576 output = ""
1577 if stdout:
1578 output = stdout.decode("utf-8").strip()
1579 # output = stdout.decode()
1580 if stderr:
1581 output = stderr.decode("utf-8").strip()
1582 # output = stderr.decode()
1583
1584 if return_code != 0 and show_error_log:
1585 self.log.debug(
1586 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1587 )
1588 else:
1589 self.log.debug("Return code: {}".format(return_code))
1590
1591 if raise_exception_on_error and return_code != 0:
1592 raise K8sException(output)
1593
1594 if encode_utf8:
1595 output = output.encode("utf-8").strip()
1596 output = str(output).replace("\\n", "\n")
1597
1598 return output, return_code
1599
1600 except asyncio.CancelledError:
1601 # first, kill the process if it is still running
1602 if process.returncode is None:
1603 process.kill()
1604 raise
1605 except K8sException:
1606 raise
1607 except Exception as e:
1608 msg = "Exception executing command: {} -> {}".format(command, e)
1609 self.log.error(msg)
1610 if raise_exception_on_error:
1611 raise K8sException(e) from e
1612 else:
1613 return "", -1
1614
1615 async def _local_async_exec_pipe(
1616 self,
1617 command1: str,
1618 command2: str,
1619 raise_exception_on_error: bool = True,
1620 show_error_log: bool = True,
1621 encode_utf8: bool = False,
1622 env: dict = None,
1623 ):
1624
1625 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1626 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1627 command = "{} | {}".format(command1, command2)
1628 self.log.debug(
1629 "Executing async local command: {}, env: {}".format(command, env)
1630 )
1631
1632 # split command
1633 command1 = shlex.split(command1)
1634 command2 = shlex.split(command2)
1635
1636 environ = os.environ.copy()
1637 if env:
1638 environ.update(env)
1639
1640 try:
1641 async with self.cmd_lock:
1642 read, write = os.pipe()
1643 process_1 = await asyncio.create_subprocess_exec(
1644 *command1, stdout=write, env=environ
1645 )
1646 os.close(write)
1647 process_2 = await asyncio.create_subprocess_exec(
1648 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1649 )
1650 os.close(read)
1651 stdout, stderr = await process_2.communicate()
1652
1653 return_code = process_2.returncode
1654
1655 output = ""
1656 if stdout:
1657 output = stdout.decode("utf-8").strip()
1658 # output = stdout.decode()
1659 if stderr:
1660 output = stderr.decode("utf-8").strip()
1661 # output = stderr.decode()
1662
1663 if return_code != 0 and show_error_log:
1664 self.log.debug(
1665 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1666 )
1667 else:
1668 self.log.debug("Return code: {}".format(return_code))
1669
1670 if raise_exception_on_error and return_code != 0:
1671 raise K8sException(output)
1672
1673 if encode_utf8:
1674 output = output.encode("utf-8").strip()
1675 output = str(output).replace("\\n", "\n")
1676
1677 return output, return_code
1678 except asyncio.CancelledError:
1679 # first, kill the processes if they are still running
1680 for process in (process_1, process_2):
1681 if process.returncode is None:
1682 process.kill()
1683 raise
1684 except K8sException:
1685 raise
1686 except Exception as e:
1687 msg = "Exception executing command: {} -> {}".format(command, e)
1688 self.log.error(msg)
1689 if raise_exception_on_error:
1690 raise K8sException(e) from e
1691 else:
1692 return "", -1
1693
1694 async def _get_service(self, cluster_id, service_name, namespace):
1695 """
1696 Obtains the data of the specified service in the k8cluster.
1697
1698 :param cluster_id: id of a K8s cluster known by OSM
1699 :param service_name: name of the K8s service in the specified namespace
1700 :param namespace: K8s namespace used by the KDU instance
1701 :return: If successful, it will return a service with the following data:
1702 - `name` of the service
1703 - `type` type of service in the k8 cluster
1704 - `ports` List of ports offered by the service, for each port includes at least
1705 name, port, protocol
1706 - `cluster_ip` Internal ip to be used inside k8s cluster
1707 - `external_ip` List of external ips (in case they are available)
1708 """
1709
1710 # init config, env
1711 paths, env = self._init_paths_env(
1712 cluster_name=cluster_id, create_if_not_exist=True
1713 )
1714
1715 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1716 self.kubectl_command, paths["kube_config"], namespace, service_name
1717 )
1718
1719 output, _rc = await self._local_async_exec(
1720 command=command, raise_exception_on_error=True, env=env
1721 )
1722
1723 data = yaml.load(output, Loader=yaml.SafeLoader)
1724
1725 service = {
1726 "name": service_name,
1727 "type": self._get_deep(data, ("spec", "type")),
1728 "ports": self._get_deep(data, ("spec", "ports")),
1729 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1730 }
1731 if service["type"] == "LoadBalancer":
1732 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1733 ip_list = [elem["ip"] for elem in ip_map_list]
1734 service["external_ip"] = ip_list
1735
1736 return service
1737
1738 async def _exec_get_command(
1739 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1740 ):
1741 """Obtains information about the kdu instance."""
1742
1743 full_command = self._get_get_command(
1744 get_command, kdu_instance, namespace, kubeconfig
1745 )
1746
1747 output, _rc = await self._local_async_exec(command=full_command)
1748
1749 return output
1750
1751 async def _exec_inspect_command(
1752 self, inspect_command: str, kdu_model: str, repo_url: str = None
1753 ):
1754 """Obtains information about an Helm Chart package (´helm show´ command)
1755
1756 Args:
1757 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1758 kdu_model: The name or path of an Helm Chart
1759 repo_url: Helm Chart repository url
1760
1761 Returns:
1762 str: the requested info about the Helm Chart package
1763 """
1764
1765 repo_str = ""
1766 if repo_url:
1767 repo_str = " --repo {}".format(repo_url)
1768
1769 # Obtain the Chart's name and store it in the var kdu_model
1770 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1771
1772 kdu_model, version = self._split_version(kdu_model)
1773 if version:
1774 version_str = "--version {}".format(version)
1775 else:
1776 version_str = ""
1777
1778 full_command = self._get_inspect_command(
1779 show_command=inspect_command,
1780 kdu_model=kdu_model,
1781 repo_str=repo_str,
1782 version=version_str,
1783 )
1784
1785 output, _ = await self._local_async_exec(command=full_command)
1786
1787 return output
1788
1789 async def _get_replica_count_url(
1790 self,
1791 kdu_model: str,
1792 repo_url: str = None,
1793 resource_name: str = None,
1794 ) -> (int, str):
1795 """Get the replica count value in the Helm Chart Values.
1796
1797 Args:
1798 kdu_model: The name or path of an Helm Chart
1799 repo_url: Helm Chart repository url
1800 resource_name: Resource name
1801
1802 Returns:
1803 A tuple with:
1804 - The number of replicas of the specific instance; if not found, returns None; and
1805 - The string corresponding to the replica count key in the Helm values
1806 """
1807
1808 kdu_values = yaml.load(
1809 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1810 Loader=yaml.SafeLoader,
1811 )
1812
1813 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1814
1815 if not kdu_values:
1816 raise K8sException(
1817 "kdu_values not found for kdu_model {}".format(kdu_model)
1818 )
1819
1820 if resource_name:
1821 kdu_values = kdu_values.get(resource_name, None)
1822
1823 if not kdu_values:
1824 msg = "resource {} not found in the values in model {}".format(
1825 resource_name, kdu_model
1826 )
1827 self.log.error(msg)
1828 raise K8sException(msg)
1829
1830 duplicate_check = False
1831
1832 replica_str = ""
1833 replicas = None
1834
1835 if kdu_values.get("replicaCount") is not None:
1836 replicas = kdu_values["replicaCount"]
1837 replica_str = "replicaCount"
1838 elif kdu_values.get("replicas") is not None:
1839 duplicate_check = True
1840 replicas = kdu_values["replicas"]
1841 replica_str = "replicas"
1842 else:
1843 if resource_name:
1844 msg = (
1845 "replicaCount or replicas not found in the resource"
1846 "{} values in model {}. Cannot be scaled".format(
1847 resource_name, kdu_model
1848 )
1849 )
1850 else:
1851 msg = (
1852 "replicaCount or replicas not found in the values"
1853 "in model {}. Cannot be scaled".format(kdu_model)
1854 )
1855 self.log.error(msg)
1856 raise K8sException(msg)
1857
1858 # Control if replicas and replicaCount exists at the same time
1859 msg = "replicaCount and replicas are exists at the same time"
1860 if duplicate_check:
1861 if "replicaCount" in kdu_values:
1862 self.log.error(msg)
1863 raise K8sException(msg)
1864 else:
1865 if "replicas" in kdu_values:
1866 self.log.error(msg)
1867 raise K8sException(msg)
1868
1869 return replicas, replica_str
1870
1871 async def _get_replica_count_instance(
1872 self,
1873 kdu_instance: str,
1874 namespace: str,
1875 kubeconfig: str,
1876 resource_name: str = None,
1877 ) -> int:
1878 """Get the replica count value in the instance.
1879
1880 Args:
1881 kdu_instance: The name of the KDU instance
1882 namespace: KDU instance namespace
1883 kubeconfig:
1884 resource_name: Resource name
1885
1886 Returns:
1887 The number of replicas of the specific instance; if not found, returns None
1888 """
1889
1890 kdu_values = yaml.load(
1891 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1892 Loader=yaml.SafeLoader,
1893 )
1894
1895 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1896
1897 replicas = None
1898
1899 if kdu_values:
1900 resource_values = (
1901 kdu_values.get(resource_name, None) if resource_name else None
1902 )
1903
1904 for replica_str in ("replicaCount", "replicas"):
1905 if resource_values:
1906 replicas = resource_values.get(replica_str)
1907 else:
1908 replicas = kdu_values.get(replica_str)
1909
1910 if replicas is not None:
1911 break
1912
1913 return replicas
1914
1915 async def _store_status(
1916 self,
1917 cluster_id: str,
1918 operation: str,
1919 kdu_instance: str,
1920 namespace: str = None,
1921 db_dict: dict = None,
1922 ) -> None:
1923 """
1924 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1925
1926 :param cluster_id (str): the cluster where the KDU instance is deployed
1927 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1928 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1929 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1930 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1931 values for the keys:
1932 - "collection": The Mongo DB collection to write to
1933 - "filter": The query filter to use in the update process
1934 - "path": The dot separated keys which targets the object to be updated
1935 Defaults to None.
1936 """
1937
1938 try:
1939 detailed_status = await self._status_kdu(
1940 cluster_id=cluster_id,
1941 kdu_instance=kdu_instance,
1942 yaml_format=False,
1943 namespace=namespace,
1944 )
1945
1946 status = detailed_status.get("info").get("description")
1947 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1948
1949 # write status to db
1950 result = await self.write_app_status_to_db(
1951 db_dict=db_dict,
1952 status=str(status),
1953 detailed_status=str(detailed_status),
1954 operation=operation,
1955 )
1956
1957 if not result:
1958 self.log.info("Error writing in database. Task exiting...")
1959
1960 except asyncio.CancelledError as e:
1961 self.log.warning(
1962 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1963 )
1964 except Exception as e:
1965 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1966
1967 # params for use in -f file
1968 # returns values file option and filename (in order to delete it at the end)
1969 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1970
1971 if params and len(params) > 0:
1972 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1973
1974 def get_random_number():
1975 r = random.randrange(start=1, stop=99999999)
1976 s = str(r)
1977 while len(s) < 10:
1978 s = "0" + s
1979 return s
1980
1981 params2 = dict()
1982 for key in params:
1983 value = params.get(key)
1984 if "!!yaml" in str(value):
1985 value = yaml.safe_load(value[7:])
1986 params2[key] = value
1987
1988 values_file = get_random_number() + ".yaml"
1989 with open(values_file, "w") as stream:
1990 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1991
1992 return "-f {}".format(values_file), values_file
1993
1994 return "", None
1995
1996 # params for use in --set option
1997 @staticmethod
1998 def _params_to_set_option(params: dict) -> str:
1999 params_str = ""
2000 if params and len(params) > 0:
2001 start = True
2002 for key in params:
2003 value = params.get(key, None)
2004 if value is not None:
2005 if start:
2006 params_str += "--set "
2007 start = False
2008 else:
2009 params_str += ","
2010 params_str += "{}={}".format(key, value)
2011 return params_str
2012
2013 @staticmethod
2014 def generate_kdu_instance_name(**kwargs):
2015 chart_name = kwargs["kdu_model"]
2016 # check embeded chart (file or dir)
2017 if chart_name.startswith("/"):
2018 # extract file or directory name
2019 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2020 # check URL
2021 elif "://" in chart_name:
2022 # extract last portion of URL
2023 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2024
2025 name = ""
2026 for c in chart_name:
2027 if c.isalpha() or c.isnumeric():
2028 name += c
2029 else:
2030 name += "-"
2031 if len(name) > 35:
2032 name = name[0:35]
2033
2034 # if does not start with alpha character, prefix 'a'
2035 if not name[0].isalpha():
2036 name = "a" + name
2037
2038 name += "-"
2039
2040 def get_random_number():
2041 r = random.randrange(start=1, stop=99999999)
2042 s = str(r)
2043 s = s.rjust(10, "0")
2044 return s
2045
2046 name = name + get_random_number()
2047 return name.lower()
2048
2049 def _split_version(self, kdu_model: str) -> (str, str):
2050 version = None
2051 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2052 parts = kdu_model.split(sep=":")
2053 if len(parts) == 2:
2054 version = str(parts[1])
2055 kdu_model = parts[0]
2056 return kdu_model, version
2057
2058 def _split_repo(self, kdu_model: str) -> (str, str):
2059 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2060
2061 Args:
2062 kdu_model (str): Associated KDU model
2063
2064 Returns:
2065 (str, str): Tuple with the Chart name in index 0, and the repo name
2066 in index 2; if there was a problem finding them, return None
2067 for both
2068 """
2069
2070 chart_name = None
2071 repo_name = None
2072
2073 idx = kdu_model.find("/")
2074 if idx >= 0:
2075 chart_name = kdu_model[idx + 1 :]
2076 repo_name = kdu_model[:idx]
2077
2078 return chart_name, repo_name
2079
2080 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2081 """Obtain the Helm repository for an Helm Chart
2082
2083 Args:
2084 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2085 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2086
2087 Returns:
2088 str: the repository URL; if Helm Chart is a local one, the function returns None
2089 """
2090
2091 _, repo_name = self._split_repo(kdu_model=kdu_model)
2092
2093 repo_url = None
2094 if repo_name:
2095 # Find repository link
2096 local_repo_list = await self.repo_list(cluster_uuid)
2097 for repo in local_repo_list:
2098 if repo["name"] == repo_name:
2099 repo_url = repo["url"]
2100 break # it is not necessary to continue the loop if the repo link was found...
2101
2102 return repo_url
2103
2104 async def create_certificate(
2105 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2106 ):
2107 paths, env = self._init_paths_env(
2108 cluster_name=cluster_uuid, create_if_not_exist=True
2109 )
2110 kubectl = Kubectl(config_file=paths["kube_config"])
2111 await kubectl.create_certificate(
2112 namespace=namespace,
2113 name=name,
2114 dns_prefix=dns_prefix,
2115 secret_name=secret_name,
2116 usages=[usage],
2117 issuer_name="ca-issuer",
2118 )
2119
2120 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2121 paths, env = self._init_paths_env(
2122 cluster_name=cluster_uuid, create_if_not_exist=True
2123 )
2124 kubectl = Kubectl(config_file=paths["kube_config"])
2125 await kubectl.delete_certificate(namespace, certificate_name)