Feature 10956: Add namespace and force arguments to helm upgrade
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
96 def _get_namespace(self, cluster_uuid: str) -> str:
97 """
98 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
101 """
102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
108
109 async def init_env(
110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
131 cluster_id = reuse_cluster_uuid
132 else:
133 cluster_id = str(uuid4())
134
135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
155 return cluster_id, n2vc_installed_sw
156
157 async def repo_add(
158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
166 ):
167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid, repo_type, name, url
170 )
171 )
172
173 # init_env
174 paths, env = self._init_paths_env(
175 cluster_name=cluster_uuid, create_if_not_exist=True
176 )
177
178 # sync local dir
179 self.fs.sync(from_path=cluster_uuid)
180
181 # helm repo add name url
182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths["kube_config"], self._helm_command, name, url
184 )
185
186 if cert:
187 temp_cert_file = os.path.join(
188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
201 self.log.debug("adding repo: {}".format(command))
202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
205
206 # helm repo update
207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
215 # sync fs
216 self.fs.reverse_sync(from_path=cluster_uuid)
217
218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
251
252 # config filename
253 paths, env = self._init_paths_env(
254 cluster_name=cluster_uuid, create_if_not_exist=True
255 )
256
257 # sync local dir
258 self.fs.sync(from_path=cluster_uuid)
259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
270 self.fs.reverse_sync(from_path=cluster_uuid)
271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
286
287 # init env, paths
288 paths, env = self._init_paths_env(
289 cluster_name=cluster_uuid, create_if_not_exist=True
290 )
291
292 # sync local dir
293 self.fs.sync(from_path=cluster_uuid)
294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
300 )
301
302 # sync fs
303 self.fs.reverse_sync(from_path=cluster_uuid)
304
305 async def reset(
306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
311 ) -> bool:
312 """Reset a cluster
313
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid, uninstall_sw
326 )
327 )
328
329 # sync local dir
330 self.fs.sync(from_path=cluster_uuid)
331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid)
360 self.log.warn(msg)
361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
364
365 if uninstall_sw:
366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
367
368 # delete cluster directory
369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
371 # Remove also local directorio if still exist
372 direct = self.fs.path + "/" + cluster_uuid
373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
380 async def _install_impl(
381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
393 ):
394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
405 kdu_model, version = self._split_version(kdu_model)
406
407 _, repo = self._split_repo(kdu_model)
408 if repo:
409 await self.repo_update(cluster_id, repo)
410
411 command = self._get_install_command(
412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
420 )
421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 namespace: str = None,
485 force: bool = False,
486 ):
487 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
488
489 # sync local dir
490 self.fs.sync(from_path=cluster_uuid)
491
492 # look for instance to obtain namespace
493
494 # set namespace
495 if not namespace:
496 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
497 if not instance_info:
498 raise K8sException("kdu_instance {} not found".format(kdu_instance))
499 namespace = instance_info["namespace"]
500
501 # init env, paths
502 paths, env = self._init_paths_env(
503 cluster_name=cluster_uuid, create_if_not_exist=True
504 )
505
506 # sync local dir
507 self.fs.sync(from_path=cluster_uuid)
508
509 # params to str
510 params_str, file_to_delete = self._params_to_file_option(
511 cluster_id=cluster_uuid, params=params
512 )
513
514 # version
515 kdu_model, version = self._split_version(kdu_model)
516
517 _, repo = self._split_repo(kdu_model)
518 if repo:
519 await self.repo_update(cluster_uuid, repo)
520
521 command = self._get_upgrade_command(
522 kdu_model,
523 kdu_instance,
524 namespace,
525 params_str,
526 version,
527 atomic,
528 timeout,
529 paths["kube_config"],
530 force,
531 )
532
533 self.log.debug("upgrading: {}".format(command))
534
535 if atomic:
536
537 # exec helm in a task
538 exec_task = asyncio.ensure_future(
539 coro_or_future=self._local_async_exec(
540 command=command, raise_exception_on_error=False, env=env
541 )
542 )
543 # write status in another task
544 status_task = asyncio.ensure_future(
545 coro_or_future=self._store_status(
546 cluster_id=cluster_uuid,
547 kdu_instance=kdu_instance,
548 namespace=namespace,
549 db_dict=db_dict,
550 operation="upgrade",
551 )
552 )
553
554 # wait for execution task
555 await asyncio.wait([exec_task])
556
557 # cancel status task
558 status_task.cancel()
559 output, rc = exec_task.result()
560
561 else:
562
563 output, rc = await self._local_async_exec(
564 command=command, raise_exception_on_error=False, env=env
565 )
566
567 # remove temporal values yaml file
568 if file_to_delete:
569 os.remove(file_to_delete)
570
571 # write final status
572 await self._store_status(
573 cluster_id=cluster_uuid,
574 kdu_instance=kdu_instance,
575 namespace=namespace,
576 db_dict=db_dict,
577 operation="upgrade",
578 )
579
580 if rc != 0:
581 msg = "Error executing command: {}\nOutput: {}".format(command, output)
582 self.log.error(msg)
583 raise K8sException(msg)
584
585 # sync fs
586 self.fs.reverse_sync(from_path=cluster_uuid)
587
588 # return new revision number
589 instance = await self.get_instance_info(
590 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
591 )
592 if instance:
593 revision = int(instance.get("revision"))
594 self.log.debug("New revision: {}".format(revision))
595 return revision
596 else:
597 return 0
598
599 async def scale(
600 self,
601 kdu_instance: str,
602 scale: int,
603 resource_name: str,
604 total_timeout: float = 1800,
605 cluster_uuid: str = None,
606 kdu_model: str = None,
607 atomic: bool = True,
608 db_dict: dict = None,
609 **kwargs,
610 ):
611 """Scale a resource in a Helm Chart.
612
613 Args:
614 kdu_instance: KDU instance name
615 scale: Scale to which to set the resource
616 resource_name: Resource name
617 total_timeout: The time, in seconds, to wait
618 cluster_uuid: The UUID of the cluster
619 kdu_model: The chart reference
620 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
621 The --wait flag will be set automatically if --atomic is used
622 db_dict: Dictionary for any additional data
623 kwargs: Additional parameters
624
625 Returns:
626 True if successful, False otherwise
627 """
628
629 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
630 if resource_name:
631 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
632 resource_name, kdu_model, cluster_uuid
633 )
634
635 self.log.debug(debug_mgs)
636
637 # look for instance to obtain namespace
638 # get_instance_info function calls the sync command
639 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
640 if not instance_info:
641 raise K8sException("kdu_instance {} not found".format(kdu_instance))
642
643 # init env, paths
644 paths, env = self._init_paths_env(
645 cluster_name=cluster_uuid, create_if_not_exist=True
646 )
647
648 # version
649 kdu_model, version = self._split_version(kdu_model)
650
651 repo_url = await self._find_repo(kdu_model, cluster_uuid)
652
653 _, replica_str = await self._get_replica_count_url(
654 kdu_model, repo_url, resource_name
655 )
656
657 command = self._get_upgrade_scale_command(
658 kdu_model,
659 kdu_instance,
660 instance_info["namespace"],
661 scale,
662 version,
663 atomic,
664 replica_str,
665 total_timeout,
666 resource_name,
667 paths["kube_config"],
668 )
669
670 self.log.debug("scaling: {}".format(command))
671
672 if atomic:
673 # exec helm in a task
674 exec_task = asyncio.ensure_future(
675 coro_or_future=self._local_async_exec(
676 command=command, raise_exception_on_error=False, env=env
677 )
678 )
679 # write status in another task
680 status_task = asyncio.ensure_future(
681 coro_or_future=self._store_status(
682 cluster_id=cluster_uuid,
683 kdu_instance=kdu_instance,
684 namespace=instance_info["namespace"],
685 db_dict=db_dict,
686 operation="scale",
687 )
688 )
689
690 # wait for execution task
691 await asyncio.wait([exec_task])
692
693 # cancel status task
694 status_task.cancel()
695 output, rc = exec_task.result()
696
697 else:
698 output, rc = await self._local_async_exec(
699 command=command, raise_exception_on_error=False, env=env
700 )
701
702 # write final status
703 await self._store_status(
704 cluster_id=cluster_uuid,
705 kdu_instance=kdu_instance,
706 namespace=instance_info["namespace"],
707 db_dict=db_dict,
708 operation="scale",
709 )
710
711 if rc != 0:
712 msg = "Error executing command: {}\nOutput: {}".format(command, output)
713 self.log.error(msg)
714 raise K8sException(msg)
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_uuid)
718
719 return True
720
721 async def get_scale_count(
722 self,
723 resource_name: str,
724 kdu_instance: str,
725 cluster_uuid: str,
726 kdu_model: str,
727 **kwargs,
728 ) -> int:
729 """Get a resource scale count.
730
731 Args:
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of an Helm Chart
736 kwargs: Additional parameters
737
738 Returns:
739 Resource instance count
740 """
741
742 self.log.debug(
743 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
744 )
745
746 # look for instance to obtain namespace
747 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
748 if not instance_info:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance))
750
751 # init env, paths
752 paths, _ = self._init_paths_env(
753 cluster_name=cluster_uuid, create_if_not_exist=True
754 )
755
756 replicas = await self._get_replica_count_instance(
757 kdu_instance=kdu_instance,
758 namespace=instance_info["namespace"],
759 kubeconfig=paths["kube_config"],
760 resource_name=resource_name,
761 )
762
763 self.log.debug(
764 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
765 )
766
767 # Get default value if scale count is not found from provided values
768 # Important note: this piece of code shall only be executed in the first scaling operation,
769 # since it is expected that the _get_replica_count_instance is able to obtain the number of
770 # replicas when a scale operation was already conducted previously for this KDU/resource!
771 if replicas is None:
772 repo_url = await self._find_repo(
773 kdu_model=kdu_model, cluster_uuid=cluster_uuid
774 )
775 replicas, _ = await self._get_replica_count_url(
776 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
777 )
778
779 self.log.debug(
780 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
781 f"{resource_name} obtained: {replicas}"
782 )
783
784 if replicas is None:
785 msg = "Replica count not found. Cannot be scaled"
786 self.log.error(msg)
787 raise K8sException(msg)
788
789 return int(replicas)
790
791 async def rollback(
792 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
793 ):
794 self.log.debug(
795 "rollback kdu_instance {} to revision {} from cluster {}".format(
796 kdu_instance, revision, cluster_uuid
797 )
798 )
799
800 # sync local dir
801 self.fs.sync(from_path=cluster_uuid)
802
803 # look for instance to obtain namespace
804 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
805 if not instance_info:
806 raise K8sException("kdu_instance {} not found".format(kdu_instance))
807
808 # init env, paths
809 paths, env = self._init_paths_env(
810 cluster_name=cluster_uuid, create_if_not_exist=True
811 )
812
813 # sync local dir
814 self.fs.sync(from_path=cluster_uuid)
815
816 command = self._get_rollback_command(
817 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
818 )
819
820 self.log.debug("rolling_back: {}".format(command))
821
822 # exec helm in a task
823 exec_task = asyncio.ensure_future(
824 coro_or_future=self._local_async_exec(
825 command=command, raise_exception_on_error=False, env=env
826 )
827 )
828 # write status in another task
829 status_task = asyncio.ensure_future(
830 coro_or_future=self._store_status(
831 cluster_id=cluster_uuid,
832 kdu_instance=kdu_instance,
833 namespace=instance_info["namespace"],
834 db_dict=db_dict,
835 operation="rollback",
836 )
837 )
838
839 # wait for execution task
840 await asyncio.wait([exec_task])
841
842 # cancel status task
843 status_task.cancel()
844
845 output, rc = exec_task.result()
846
847 # write final status
848 await self._store_status(
849 cluster_id=cluster_uuid,
850 kdu_instance=kdu_instance,
851 namespace=instance_info["namespace"],
852 db_dict=db_dict,
853 operation="rollback",
854 )
855
856 if rc != 0:
857 msg = "Error executing command: {}\nOutput: {}".format(command, output)
858 self.log.error(msg)
859 raise K8sException(msg)
860
861 # sync fs
862 self.fs.reverse_sync(from_path=cluster_uuid)
863
864 # return new revision number
865 instance = await self.get_instance_info(
866 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
867 )
868 if instance:
869 revision = int(instance.get("revision"))
870 self.log.debug("New revision: {}".format(revision))
871 return revision
872 else:
873 return 0
874
875 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
876 """
877 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
878 (this call should happen after all _terminate-config-primitive_ of the VNF
879 are invoked).
880
881 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
882 :param kdu_instance: unique name for the KDU instance to be deleted
883 :param kwargs: Additional parameters (None yet)
884 :return: True if successful
885 """
886
887 self.log.debug(
888 "uninstall kdu_instance {} from cluster {}".format(
889 kdu_instance, cluster_uuid
890 )
891 )
892
893 # sync local dir
894 self.fs.sync(from_path=cluster_uuid)
895
896 # look for instance to obtain namespace
897 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
898 if not instance_info:
899 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
900 return True
901 # init env, paths
902 paths, env = self._init_paths_env(
903 cluster_name=cluster_uuid, create_if_not_exist=True
904 )
905
906 # sync local dir
907 self.fs.sync(from_path=cluster_uuid)
908
909 command = self._get_uninstall_command(
910 kdu_instance, instance_info["namespace"], paths["kube_config"]
911 )
912 output, _rc = await self._local_async_exec(
913 command=command, raise_exception_on_error=True, env=env
914 )
915
916 # sync fs
917 self.fs.reverse_sync(from_path=cluster_uuid)
918
919 return self._output_to_table(output)
920
921 async def instances_list(self, cluster_uuid: str) -> list:
922 """
923 returns a list of deployed releases in a cluster
924
925 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
926 :return:
927 """
928
929 self.log.debug("list releases for cluster {}".format(cluster_uuid))
930
931 # sync local dir
932 self.fs.sync(from_path=cluster_uuid)
933
934 # execute internal command
935 result = await self._instances_list(cluster_uuid)
936
937 # sync fs
938 self.fs.reverse_sync(from_path=cluster_uuid)
939
940 return result
941
942 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
943 instances = await self.instances_list(cluster_uuid=cluster_uuid)
944 for instance in instances:
945 if instance.get("name") == kdu_instance:
946 return instance
947 self.log.debug("Instance {} not found".format(kdu_instance))
948 return None
949
950 async def upgrade_charm(
951 self,
952 ee_id: str = None,
953 path: str = None,
954 charm_id: str = None,
955 charm_type: str = None,
956 timeout: float = None,
957 ) -> str:
958 """This method upgrade charms in VNFs
959
960 Args:
961 ee_id: Execution environment id
962 path: Local path to the charm
963 charm_id: charm-id
964 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
965 timeout: (Float) Timeout for the ns update operation
966
967 Returns:
968 The output of the update operation if status equals to "completed"
969 """
970 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
971
972 async def exec_primitive(
973 self,
974 cluster_uuid: str = None,
975 kdu_instance: str = None,
976 primitive_name: str = None,
977 timeout: float = 300,
978 params: dict = None,
979 db_dict: dict = None,
980 **kwargs,
981 ) -> str:
982 """Exec primitive (Juju action)
983
984 :param cluster_uuid: The UUID of the cluster or namespace:cluster
985 :param kdu_instance: The unique name of the KDU instance
986 :param primitive_name: Name of action that will be executed
987 :param timeout: Timeout for action execution
988 :param params: Dictionary of all the parameters needed for the action
989 :db_dict: Dictionary for any additional data
990 :param kwargs: Additional parameters (None yet)
991
992 :return: Returns the output of the action
993 """
994 raise K8sException(
995 "KDUs deployed with Helm don't support actions "
996 "different from rollback, upgrade and status"
997 )
998
999 async def get_services(
1000 self, cluster_uuid: str, kdu_instance: str, namespace: str
1001 ) -> list:
1002 """
1003 Returns a list of services defined for the specified kdu instance.
1004
1005 :param cluster_uuid: UUID of a K8s cluster known by OSM
1006 :param kdu_instance: unique name for the KDU instance
1007 :param namespace: K8s namespace used by the KDU instance
1008 :return: If successful, it will return a list of services, Each service
1009 can have the following data:
1010 - `name` of the service
1011 - `type` type of service in the k8 cluster
1012 - `ports` List of ports offered by the service, for each port includes at least
1013 name, port, protocol
1014 - `cluster_ip` Internal ip to be used inside k8s cluster
1015 - `external_ip` List of external ips (in case they are available)
1016 """
1017
1018 self.log.debug(
1019 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1020 cluster_uuid, kdu_instance
1021 )
1022 )
1023
1024 # init env, paths
1025 paths, env = self._init_paths_env(
1026 cluster_name=cluster_uuid, create_if_not_exist=True
1027 )
1028
1029 # sync local dir
1030 self.fs.sync(from_path=cluster_uuid)
1031
1032 # get list of services names for kdu
1033 service_names = await self._get_services(
1034 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1035 )
1036
1037 service_list = []
1038 for service in service_names:
1039 service = await self._get_service(cluster_uuid, service, namespace)
1040 service_list.append(service)
1041
1042 # sync fs
1043 self.fs.reverse_sync(from_path=cluster_uuid)
1044
1045 return service_list
1046
1047 async def get_service(
1048 self, cluster_uuid: str, service_name: str, namespace: str
1049 ) -> object:
1050
1051 self.log.debug(
1052 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1053 service_name, namespace, cluster_uuid
1054 )
1055 )
1056
1057 # sync local dir
1058 self.fs.sync(from_path=cluster_uuid)
1059
1060 service = await self._get_service(cluster_uuid, service_name, namespace)
1061
1062 # sync fs
1063 self.fs.reverse_sync(from_path=cluster_uuid)
1064
1065 return service
1066
1067 async def status_kdu(
1068 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1069 ) -> Union[str, dict]:
1070 """
1071 This call would retrieve tha current state of a given KDU instance. It would be
1072 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1073 values_ of the configuration parameters applied to a given instance. This call
1074 would be based on the `status` call.
1075
1076 :param cluster_uuid: UUID of a K8s cluster known by OSM
1077 :param kdu_instance: unique name for the KDU instance
1078 :param kwargs: Additional parameters (None yet)
1079 :param yaml_format: if the return shall be returned as an YAML string or as a
1080 dictionary
1081 :return: If successful, it will return the following vector of arguments:
1082 - K8s `namespace` in the cluster where the KDU lives
1083 - `state` of the KDU instance. It can be:
1084 - UNKNOWN
1085 - DEPLOYED
1086 - DELETED
1087 - SUPERSEDED
1088 - FAILED or
1089 - DELETING
1090 - List of `resources` (objects) that this release consists of, sorted by kind,
1091 and the status of those resources
1092 - Last `deployment_time`.
1093
1094 """
1095 self.log.debug(
1096 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1097 cluster_uuid, kdu_instance
1098 )
1099 )
1100
1101 # sync local dir
1102 self.fs.sync(from_path=cluster_uuid)
1103
1104 # get instance: needed to obtain namespace
1105 instances = await self._instances_list(cluster_id=cluster_uuid)
1106 for instance in instances:
1107 if instance.get("name") == kdu_instance:
1108 break
1109 else:
1110 # instance does not exist
1111 raise K8sException(
1112 "Instance name: {} not found in cluster: {}".format(
1113 kdu_instance, cluster_uuid
1114 )
1115 )
1116
1117 status = await self._status_kdu(
1118 cluster_id=cluster_uuid,
1119 kdu_instance=kdu_instance,
1120 namespace=instance["namespace"],
1121 yaml_format=yaml_format,
1122 show_error_log=True,
1123 )
1124
1125 # sync fs
1126 self.fs.reverse_sync(from_path=cluster_uuid)
1127
1128 return status
1129
1130 async def get_values_kdu(
1131 self, kdu_instance: str, namespace: str, kubeconfig: str
1132 ) -> str:
1133
1134 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1135
1136 return await self._exec_get_command(
1137 get_command="values",
1138 kdu_instance=kdu_instance,
1139 namespace=namespace,
1140 kubeconfig=kubeconfig,
1141 )
1142
1143 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1144 """Method to obtain the Helm Chart package's values
1145
1146 Args:
1147 kdu_model: The name or path of an Helm Chart
1148 repo_url: Helm Chart repository url
1149
1150 Returns:
1151 str: the values of the Helm Chart package
1152 """
1153
1154 self.log.debug(
1155 "inspect kdu_model values {} from (optional) repo: {}".format(
1156 kdu_model, repo_url
1157 )
1158 )
1159
1160 return await self._exec_inspect_command(
1161 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1162 )
1163
1164 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1165
1166 self.log.debug(
1167 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1168 )
1169
1170 return await self._exec_inspect_command(
1171 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1172 )
1173
1174 async def synchronize_repos(self, cluster_uuid: str):
1175
1176 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1177 try:
1178 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1179 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1180
1181 local_repo_list = await self.repo_list(cluster_uuid)
1182 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1183
1184 deleted_repo_list = []
1185 added_repo_dict = {}
1186
1187 # iterate over the list of repos in the database that should be
1188 # added if not present
1189 for repo_name, db_repo in db_repo_dict.items():
1190 try:
1191 # check if it is already present
1192 curr_repo_url = local_repo_dict.get(db_repo["name"])
1193 repo_id = db_repo.get("_id")
1194 if curr_repo_url != db_repo["url"]:
1195 if curr_repo_url:
1196 self.log.debug(
1197 "repo {} url changed, delete and and again".format(
1198 db_repo["url"]
1199 )
1200 )
1201 await self.repo_remove(cluster_uuid, db_repo["name"])
1202 deleted_repo_list.append(repo_id)
1203
1204 # add repo
1205 self.log.debug("add repo {}".format(db_repo["name"]))
1206 if "ca_cert" in db_repo:
1207 await self.repo_add(
1208 cluster_uuid,
1209 db_repo["name"],
1210 db_repo["url"],
1211 cert=db_repo["ca_cert"],
1212 )
1213 else:
1214 await self.repo_add(
1215 cluster_uuid,
1216 db_repo["name"],
1217 db_repo["url"],
1218 )
1219 added_repo_dict[repo_id] = db_repo["name"]
1220 except Exception as e:
1221 raise K8sException(
1222 "Error adding repo id: {}, err_msg: {} ".format(
1223 repo_id, repr(e)
1224 )
1225 )
1226
1227 # Delete repos that are present but not in nbi_list
1228 for repo_name in local_repo_dict:
1229 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1230 self.log.debug("delete repo {}".format(repo_name))
1231 try:
1232 await self.repo_remove(cluster_uuid, repo_name)
1233 deleted_repo_list.append(repo_name)
1234 except Exception as e:
1235 self.warning(
1236 "Error deleting repo, name: {}, err_msg: {}".format(
1237 repo_name, str(e)
1238 )
1239 )
1240
1241 return deleted_repo_list, added_repo_dict
1242
1243 except K8sException:
1244 raise
1245 except Exception as e:
1246 # Do not raise errors synchronizing repos
1247 self.log.error("Error synchronizing repos: {}".format(e))
1248 raise Exception("Error synchronizing repos: {}".format(e))
1249
1250 def _get_db_repos_dict(self, repo_ids: list):
1251 db_repos_dict = {}
1252 for repo_id in repo_ids:
1253 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1254 db_repos_dict[db_repo["name"]] = db_repo
1255 return db_repos_dict
1256
1257 """
1258 ####################################################################################
1259 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1260 ####################################################################################
1261 """
1262
1263 @abc.abstractmethod
1264 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1265 """
1266 Creates and returns base cluster and kube dirs and returns them.
1267 Also created helm3 dirs according to new directory specification, paths are
1268 not returned but assigned to helm environment variables
1269
1270 :param cluster_name: cluster_name
1271 :return: Dictionary with config_paths and dictionary with helm environment variables
1272 """
1273
1274 @abc.abstractmethod
1275 async def _cluster_init(self, cluster_id, namespace, paths, env):
1276 """
1277 Implements the helm version dependent cluster initialization
1278 """
1279
1280 @abc.abstractmethod
1281 async def _instances_list(self, cluster_id):
1282 """
1283 Implements the helm version dependent helm instances list
1284 """
1285
1286 @abc.abstractmethod
1287 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1288 """
1289 Implements the helm version dependent method to obtain services from a helm instance
1290 """
1291
1292 @abc.abstractmethod
1293 async def _status_kdu(
1294 self,
1295 cluster_id: str,
1296 kdu_instance: str,
1297 namespace: str = None,
1298 yaml_format: bool = False,
1299 show_error_log: bool = False,
1300 ) -> Union[str, dict]:
1301 """
1302 Implements the helm version dependent method to obtain status of a helm instance
1303 """
1304
1305 @abc.abstractmethod
1306 def _get_install_command(
1307 self,
1308 kdu_model,
1309 kdu_instance,
1310 namespace,
1311 params_str,
1312 version,
1313 atomic,
1314 timeout,
1315 kubeconfig,
1316 ) -> str:
1317 """
1318 Obtain command to be executed to delete the indicated instance
1319 """
1320
1321 @abc.abstractmethod
1322 def _get_upgrade_scale_command(
1323 self,
1324 kdu_model,
1325 kdu_instance,
1326 namespace,
1327 count,
1328 version,
1329 atomic,
1330 replicas,
1331 timeout,
1332 resource_name,
1333 kubeconfig,
1334 ) -> str:
1335 """Generates the command to scale a Helm Chart release
1336
1337 Args:
1338 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1339 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1340 namespace (str): Namespace where this KDU instance is deployed
1341 scale (int): Scale count
1342 version (str): Constraint with specific version of the Chart to use
1343 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1344 The --wait flag will be set automatically if --atomic is used
1345 replica_str (str): The key under resource_name key where the scale count is stored
1346 timeout (float): The time, in seconds, to wait
1347 resource_name (str): The KDU's resource to scale
1348 kubeconfig (str): Kubeconfig file path
1349
1350 Returns:
1351 str: command to scale a Helm Chart release
1352 """
1353
1354 @abc.abstractmethod
1355 def _get_upgrade_command(
1356 self,
1357 kdu_model,
1358 kdu_instance,
1359 namespace,
1360 params_str,
1361 version,
1362 atomic,
1363 timeout,
1364 kubeconfig,
1365 force,
1366 ) -> str:
1367 """Generates the command to upgrade a Helm Chart release
1368
1369 Args:
1370 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1371 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1372 namespace (str): Namespace where this KDU instance is deployed
1373 params_str (str): Params used to upgrade the Helm Chart release
1374 version (str): Constraint with specific version of the Chart to use
1375 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1376 The --wait flag will be set automatically if --atomic is used
1377 timeout (float): The time, in seconds, to wait
1378 kubeconfig (str): Kubeconfig file path
1379 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1380 Returns:
1381 str: command to upgrade a Helm Chart release
1382 """
1383
1384 @abc.abstractmethod
1385 def _get_rollback_command(
1386 self, kdu_instance, namespace, revision, kubeconfig
1387 ) -> str:
1388 """
1389 Obtain command to be executed to rollback the indicated instance
1390 """
1391
1392 @abc.abstractmethod
1393 def _get_uninstall_command(
1394 self, kdu_instance: str, namespace: str, kubeconfig: str
1395 ) -> str:
1396 """
1397 Obtain command to be executed to delete the indicated instance
1398 """
1399
1400 @abc.abstractmethod
1401 def _get_inspect_command(
1402 self, show_command: str, kdu_model: str, repo_str: str, version: str
1403 ):
1404 """Generates the command to obtain the information about an Helm Chart package
1405 (´helm show ...´ command)
1406
1407 Args:
1408 show_command: the second part of the command (`helm show <show_command>`)
1409 kdu_model: The name or path of an Helm Chart
1410 repo_url: Helm Chart repository url
1411 version: constraint with specific version of the Chart to use
1412
1413 Returns:
1414 str: the generated Helm Chart command
1415 """
1416
1417 @abc.abstractmethod
1418 def _get_get_command(
1419 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1420 ):
1421 """Obtain command to be executed to get information about the kdu instance."""
1422
1423 @abc.abstractmethod
1424 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1425 """
1426 Method call to uninstall cluster software for helm. This method is dependent
1427 of helm version
1428 For Helm v2 it will be called when Tiller must be uninstalled
1429 For Helm v3 it does nothing and does not need to be callled
1430 """
1431
1432 @abc.abstractmethod
1433 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1434 """
1435 Obtains the cluster repos identifiers
1436 """
1437
1438 """
1439 ####################################################################################
1440 ################################### P R I V A T E ##################################
1441 ####################################################################################
1442 """
1443
1444 @staticmethod
1445 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1446 if os.path.exists(filename):
1447 return True
1448 else:
1449 msg = "File {} does not exist".format(filename)
1450 if exception_if_not_exists:
1451 raise K8sException(msg)
1452
1453 @staticmethod
1454 def _remove_multiple_spaces(strobj):
1455 strobj = strobj.strip()
1456 while " " in strobj:
1457 strobj = strobj.replace(" ", " ")
1458 return strobj
1459
1460 @staticmethod
1461 def _output_to_lines(output: str) -> list:
1462 output_lines = list()
1463 lines = output.splitlines(keepends=False)
1464 for line in lines:
1465 line = line.strip()
1466 if len(line) > 0:
1467 output_lines.append(line)
1468 return output_lines
1469
1470 @staticmethod
1471 def _output_to_table(output: str) -> list:
1472 output_table = list()
1473 lines = output.splitlines(keepends=False)
1474 for line in lines:
1475 line = line.replace("\t", " ")
1476 line_list = list()
1477 output_table.append(line_list)
1478 cells = line.split(sep=" ")
1479 for cell in cells:
1480 cell = cell.strip()
1481 if len(cell) > 0:
1482 line_list.append(cell)
1483 return output_table
1484
1485 @staticmethod
1486 def _parse_services(output: str) -> list:
1487 lines = output.splitlines(keepends=False)
1488 services = []
1489 for line in lines:
1490 line = line.replace("\t", " ")
1491 cells = line.split(sep=" ")
1492 if len(cells) > 0 and cells[0].startswith("service/"):
1493 elems = cells[0].split(sep="/")
1494 if len(elems) > 1:
1495 services.append(elems[1])
1496 return services
1497
1498 @staticmethod
1499 def _get_deep(dictionary: dict, members: tuple):
1500 target = dictionary
1501 value = None
1502 try:
1503 for m in members:
1504 value = target.get(m)
1505 if not value:
1506 return None
1507 else:
1508 target = value
1509 except Exception:
1510 pass
1511 return value
1512
1513 # find key:value in several lines
1514 @staticmethod
1515 def _find_in_lines(p_lines: list, p_key: str) -> str:
1516 for line in p_lines:
1517 try:
1518 if line.startswith(p_key + ":"):
1519 parts = line.split(":")
1520 the_value = parts[1].strip()
1521 return the_value
1522 except Exception:
1523 # ignore it
1524 pass
1525 return None
1526
1527 @staticmethod
1528 def _lower_keys_list(input_list: list):
1529 """
1530 Transform the keys in a list of dictionaries to lower case and returns a new list
1531 of dictionaries
1532 """
1533 new_list = []
1534 if input_list:
1535 for dictionary in input_list:
1536 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1537 new_list.append(new_dict)
1538 return new_list
1539
1540 async def _local_async_exec(
1541 self,
1542 command: str,
1543 raise_exception_on_error: bool = False,
1544 show_error_log: bool = True,
1545 encode_utf8: bool = False,
1546 env: dict = None,
1547 ) -> (str, int):
1548
1549 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1550 self.log.debug(
1551 "Executing async local command: {}, env: {}".format(command, env)
1552 )
1553
1554 # split command
1555 command = shlex.split(command)
1556
1557 environ = os.environ.copy()
1558 if env:
1559 environ.update(env)
1560
1561 try:
1562 async with self.cmd_lock:
1563 process = await asyncio.create_subprocess_exec(
1564 *command,
1565 stdout=asyncio.subprocess.PIPE,
1566 stderr=asyncio.subprocess.PIPE,
1567 env=environ,
1568 )
1569
1570 # wait for command terminate
1571 stdout, stderr = await process.communicate()
1572
1573 return_code = process.returncode
1574
1575 output = ""
1576 if stdout:
1577 output = stdout.decode("utf-8").strip()
1578 # output = stdout.decode()
1579 if stderr:
1580 output = stderr.decode("utf-8").strip()
1581 # output = stderr.decode()
1582
1583 if return_code != 0 and show_error_log:
1584 self.log.debug(
1585 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1586 )
1587 else:
1588 self.log.debug("Return code: {}".format(return_code))
1589
1590 if raise_exception_on_error and return_code != 0:
1591 raise K8sException(output)
1592
1593 if encode_utf8:
1594 output = output.encode("utf-8").strip()
1595 output = str(output).replace("\\n", "\n")
1596
1597 return output, return_code
1598
1599 except asyncio.CancelledError:
1600 # first, kill the process if it is still running
1601 if process.returncode is None:
1602 process.kill()
1603 raise
1604 except K8sException:
1605 raise
1606 except Exception as e:
1607 msg = "Exception executing command: {} -> {}".format(command, e)
1608 self.log.error(msg)
1609 if raise_exception_on_error:
1610 raise K8sException(e) from e
1611 else:
1612 return "", -1
1613
1614 async def _local_async_exec_pipe(
1615 self,
1616 command1: str,
1617 command2: str,
1618 raise_exception_on_error: bool = True,
1619 show_error_log: bool = True,
1620 encode_utf8: bool = False,
1621 env: dict = None,
1622 ):
1623
1624 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1625 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1626 command = "{} | {}".format(command1, command2)
1627 self.log.debug(
1628 "Executing async local command: {}, env: {}".format(command, env)
1629 )
1630
1631 # split command
1632 command1 = shlex.split(command1)
1633 command2 = shlex.split(command2)
1634
1635 environ = os.environ.copy()
1636 if env:
1637 environ.update(env)
1638
1639 try:
1640 async with self.cmd_lock:
1641 read, write = os.pipe()
1642 process_1 = await asyncio.create_subprocess_exec(
1643 *command1, stdout=write, env=environ
1644 )
1645 os.close(write)
1646 process_2 = await asyncio.create_subprocess_exec(
1647 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1648 )
1649 os.close(read)
1650 stdout, stderr = await process_2.communicate()
1651
1652 return_code = process_2.returncode
1653
1654 output = ""
1655 if stdout:
1656 output = stdout.decode("utf-8").strip()
1657 # output = stdout.decode()
1658 if stderr:
1659 output = stderr.decode("utf-8").strip()
1660 # output = stderr.decode()
1661
1662 if return_code != 0 and show_error_log:
1663 self.log.debug(
1664 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1665 )
1666 else:
1667 self.log.debug("Return code: {}".format(return_code))
1668
1669 if raise_exception_on_error and return_code != 0:
1670 raise K8sException(output)
1671
1672 if encode_utf8:
1673 output = output.encode("utf-8").strip()
1674 output = str(output).replace("\\n", "\n")
1675
1676 return output, return_code
1677 except asyncio.CancelledError:
1678 # first, kill the processes if they are still running
1679 for process in (process_1, process_2):
1680 if process.returncode is None:
1681 process.kill()
1682 raise
1683 except K8sException:
1684 raise
1685 except Exception as e:
1686 msg = "Exception executing command: {} -> {}".format(command, e)
1687 self.log.error(msg)
1688 if raise_exception_on_error:
1689 raise K8sException(e) from e
1690 else:
1691 return "", -1
1692
1693 async def _get_service(self, cluster_id, service_name, namespace):
1694 """
1695 Obtains the data of the specified service in the k8cluster.
1696
1697 :param cluster_id: id of a K8s cluster known by OSM
1698 :param service_name: name of the K8s service in the specified namespace
1699 :param namespace: K8s namespace used by the KDU instance
1700 :return: If successful, it will return a service with the following data:
1701 - `name` of the service
1702 - `type` type of service in the k8 cluster
1703 - `ports` List of ports offered by the service, for each port includes at least
1704 name, port, protocol
1705 - `cluster_ip` Internal ip to be used inside k8s cluster
1706 - `external_ip` List of external ips (in case they are available)
1707 """
1708
1709 # init config, env
1710 paths, env = self._init_paths_env(
1711 cluster_name=cluster_id, create_if_not_exist=True
1712 )
1713
1714 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1715 self.kubectl_command, paths["kube_config"], namespace, service_name
1716 )
1717
1718 output, _rc = await self._local_async_exec(
1719 command=command, raise_exception_on_error=True, env=env
1720 )
1721
1722 data = yaml.load(output, Loader=yaml.SafeLoader)
1723
1724 service = {
1725 "name": service_name,
1726 "type": self._get_deep(data, ("spec", "type")),
1727 "ports": self._get_deep(data, ("spec", "ports")),
1728 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1729 }
1730 if service["type"] == "LoadBalancer":
1731 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1732 ip_list = [elem["ip"] for elem in ip_map_list]
1733 service["external_ip"] = ip_list
1734
1735 return service
1736
1737 async def _exec_get_command(
1738 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1739 ):
1740 """Obtains information about the kdu instance."""
1741
1742 full_command = self._get_get_command(
1743 get_command, kdu_instance, namespace, kubeconfig
1744 )
1745
1746 output, _rc = await self._local_async_exec(command=full_command)
1747
1748 return output
1749
1750 async def _exec_inspect_command(
1751 self, inspect_command: str, kdu_model: str, repo_url: str = None
1752 ):
1753 """Obtains information about an Helm Chart package (´helm show´ command)
1754
1755 Args:
1756 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1757 kdu_model: The name or path of an Helm Chart
1758 repo_url: Helm Chart repository url
1759
1760 Returns:
1761 str: the requested info about the Helm Chart package
1762 """
1763
1764 repo_str = ""
1765 if repo_url:
1766 repo_str = " --repo {}".format(repo_url)
1767
1768 # Obtain the Chart's name and store it in the var kdu_model
1769 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1770
1771 kdu_model, version = self._split_version(kdu_model)
1772 if version:
1773 version_str = "--version {}".format(version)
1774 else:
1775 version_str = ""
1776
1777 full_command = self._get_inspect_command(
1778 show_command=inspect_command,
1779 kdu_model=kdu_model,
1780 repo_str=repo_str,
1781 version=version_str,
1782 )
1783
1784 output, _ = await self._local_async_exec(command=full_command)
1785
1786 return output
1787
1788 async def _get_replica_count_url(
1789 self,
1790 kdu_model: str,
1791 repo_url: str = None,
1792 resource_name: str = None,
1793 ) -> (int, str):
1794 """Get the replica count value in the Helm Chart Values.
1795
1796 Args:
1797 kdu_model: The name or path of an Helm Chart
1798 repo_url: Helm Chart repository url
1799 resource_name: Resource name
1800
1801 Returns:
1802 A tuple with:
1803 - The number of replicas of the specific instance; if not found, returns None; and
1804 - The string corresponding to the replica count key in the Helm values
1805 """
1806
1807 kdu_values = yaml.load(
1808 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1809 Loader=yaml.SafeLoader,
1810 )
1811
1812 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1813
1814 if not kdu_values:
1815 raise K8sException(
1816 "kdu_values not found for kdu_model {}".format(kdu_model)
1817 )
1818
1819 if resource_name:
1820 kdu_values = kdu_values.get(resource_name, None)
1821
1822 if not kdu_values:
1823 msg = "resource {} not found in the values in model {}".format(
1824 resource_name, kdu_model
1825 )
1826 self.log.error(msg)
1827 raise K8sException(msg)
1828
1829 duplicate_check = False
1830
1831 replica_str = ""
1832 replicas = None
1833
1834 if kdu_values.get("replicaCount") is not None:
1835 replicas = kdu_values["replicaCount"]
1836 replica_str = "replicaCount"
1837 elif kdu_values.get("replicas") is not None:
1838 duplicate_check = True
1839 replicas = kdu_values["replicas"]
1840 replica_str = "replicas"
1841 else:
1842 if resource_name:
1843 msg = (
1844 "replicaCount or replicas not found in the resource"
1845 "{} values in model {}. Cannot be scaled".format(
1846 resource_name, kdu_model
1847 )
1848 )
1849 else:
1850 msg = (
1851 "replicaCount or replicas not found in the values"
1852 "in model {}. Cannot be scaled".format(kdu_model)
1853 )
1854 self.log.error(msg)
1855 raise K8sException(msg)
1856
1857 # Control if replicas and replicaCount exists at the same time
1858 msg = "replicaCount and replicas are exists at the same time"
1859 if duplicate_check:
1860 if "replicaCount" in kdu_values:
1861 self.log.error(msg)
1862 raise K8sException(msg)
1863 else:
1864 if "replicas" in kdu_values:
1865 self.log.error(msg)
1866 raise K8sException(msg)
1867
1868 return replicas, replica_str
1869
1870 async def _get_replica_count_instance(
1871 self,
1872 kdu_instance: str,
1873 namespace: str,
1874 kubeconfig: str,
1875 resource_name: str = None,
1876 ) -> int:
1877 """Get the replica count value in the instance.
1878
1879 Args:
1880 kdu_instance: The name of the KDU instance
1881 namespace: KDU instance namespace
1882 kubeconfig:
1883 resource_name: Resource name
1884
1885 Returns:
1886 The number of replicas of the specific instance; if not found, returns None
1887 """
1888
1889 kdu_values = yaml.load(
1890 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1891 Loader=yaml.SafeLoader,
1892 )
1893
1894 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1895
1896 replicas = None
1897
1898 if kdu_values:
1899 resource_values = (
1900 kdu_values.get(resource_name, None) if resource_name else None
1901 )
1902
1903 for replica_str in ("replicaCount", "replicas"):
1904 if resource_values:
1905 replicas = resource_values.get(replica_str)
1906 else:
1907 replicas = kdu_values.get(replica_str)
1908
1909 if replicas is not None:
1910 break
1911
1912 return replicas
1913
1914 async def _store_status(
1915 self,
1916 cluster_id: str,
1917 operation: str,
1918 kdu_instance: str,
1919 namespace: str = None,
1920 db_dict: dict = None,
1921 ) -> None:
1922 """
1923 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1924
1925 :param cluster_id (str): the cluster where the KDU instance is deployed
1926 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1927 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1928 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1929 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1930 values for the keys:
1931 - "collection": The Mongo DB collection to write to
1932 - "filter": The query filter to use in the update process
1933 - "path": The dot separated keys which targets the object to be updated
1934 Defaults to None.
1935 """
1936
1937 try:
1938 detailed_status = await self._status_kdu(
1939 cluster_id=cluster_id,
1940 kdu_instance=kdu_instance,
1941 yaml_format=False,
1942 namespace=namespace,
1943 )
1944
1945 status = detailed_status.get("info").get("description")
1946 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1947
1948 # write status to db
1949 result = await self.write_app_status_to_db(
1950 db_dict=db_dict,
1951 status=str(status),
1952 detailed_status=str(detailed_status),
1953 operation=operation,
1954 )
1955
1956 if not result:
1957 self.log.info("Error writing in database. Task exiting...")
1958
1959 except asyncio.CancelledError as e:
1960 self.log.warning(
1961 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1962 )
1963 except Exception as e:
1964 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1965
1966 # params for use in -f file
1967 # returns values file option and filename (in order to delete it at the end)
1968 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1969
1970 if params and len(params) > 0:
1971 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1972
1973 def get_random_number():
1974 r = random.randrange(start=1, stop=99999999)
1975 s = str(r)
1976 while len(s) < 10:
1977 s = "0" + s
1978 return s
1979
1980 params2 = dict()
1981 for key in params:
1982 value = params.get(key)
1983 if "!!yaml" in str(value):
1984 value = yaml.safe_load(value[7:])
1985 params2[key] = value
1986
1987 values_file = get_random_number() + ".yaml"
1988 with open(values_file, "w") as stream:
1989 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1990
1991 return "-f {}".format(values_file), values_file
1992
1993 return "", None
1994
1995 # params for use in --set option
1996 @staticmethod
1997 def _params_to_set_option(params: dict) -> str:
1998 params_str = ""
1999 if params and len(params) > 0:
2000 start = True
2001 for key in params:
2002 value = params.get(key, None)
2003 if value is not None:
2004 if start:
2005 params_str += "--set "
2006 start = False
2007 else:
2008 params_str += ","
2009 params_str += "{}={}".format(key, value)
2010 return params_str
2011
2012 @staticmethod
2013 def generate_kdu_instance_name(**kwargs):
2014 chart_name = kwargs["kdu_model"]
2015 # check embeded chart (file or dir)
2016 if chart_name.startswith("/"):
2017 # extract file or directory name
2018 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2019 # check URL
2020 elif "://" in chart_name:
2021 # extract last portion of URL
2022 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2023
2024 name = ""
2025 for c in chart_name:
2026 if c.isalpha() or c.isnumeric():
2027 name += c
2028 else:
2029 name += "-"
2030 if len(name) > 35:
2031 name = name[0:35]
2032
2033 # if does not start with alpha character, prefix 'a'
2034 if not name[0].isalpha():
2035 name = "a" + name
2036
2037 name += "-"
2038
2039 def get_random_number():
2040 r = random.randrange(start=1, stop=99999999)
2041 s = str(r)
2042 s = s.rjust(10, "0")
2043 return s
2044
2045 name = name + get_random_number()
2046 return name.lower()
2047
2048 def _split_version(self, kdu_model: str) -> (str, str):
2049 version = None
2050 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2051 parts = kdu_model.split(sep=":")
2052 if len(parts) == 2:
2053 version = str(parts[1])
2054 kdu_model = parts[0]
2055 return kdu_model, version
2056
2057 def _split_repo(self, kdu_model: str) -> (str, str):
2058 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2059
2060 Args:
2061 kdu_model (str): Associated KDU model
2062
2063 Returns:
2064 (str, str): Tuple with the Chart name in index 0, and the repo name
2065 in index 2; if there was a problem finding them, return None
2066 for both
2067 """
2068
2069 chart_name = None
2070 repo_name = None
2071
2072 idx = kdu_model.find("/")
2073 if idx >= 0:
2074 chart_name = kdu_model[idx + 1 :]
2075 repo_name = kdu_model[:idx]
2076
2077 return chart_name, repo_name
2078
2079 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2080 """Obtain the Helm repository for an Helm Chart
2081
2082 Args:
2083 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2084 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2085
2086 Returns:
2087 str: the repository URL; if Helm Chart is a local one, the function returns None
2088 """
2089
2090 _, repo_name = self._split_repo(kdu_model=kdu_model)
2091
2092 repo_url = None
2093 if repo_name:
2094 # Find repository link
2095 local_repo_list = await self.repo_list(cluster_uuid)
2096 for repo in local_repo_list:
2097 if repo["name"] == repo_name:
2098 repo_url = repo["url"]
2099 break # it is not necessary to continue the loop if the repo link was found...
2100
2101 return repo_url