f0a049bb38a98a042716775f6ad4ad5180b7d3f9
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
163 ):
164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid, repo_type, name, url
167 )
168 )
169
170 # init_env
171 paths, env = self._init_paths_env(
172 cluster_name=cluster_uuid, create_if_not_exist=True
173 )
174
175 # sync local dir
176 self.fs.sync(from_path=cluster_uuid)
177
178 # helm repo add name url
179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182
183 if cert:
184 temp_cert_file = os.path.join(
185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
198 self.log.debug("adding repo: {}".format(command))
199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
202
203 # helm repo update
204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_uuid)
214
215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
248
249 # config filename
250 paths, env = self._init_paths_env(
251 cluster_name=cluster_uuid, create_if_not_exist=True
252 )
253
254 # sync local dir
255 self.fs.sync(from_path=cluster_uuid)
256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
267 self.fs.reverse_sync(from_path=cluster_uuid)
268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
283
284 # init env, paths
285 paths, env = self._init_paths_env(
286 cluster_name=cluster_uuid, create_if_not_exist=True
287 )
288
289 # sync local dir
290 self.fs.sync(from_path=cluster_uuid)
291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
297 )
298
299 # sync fs
300 self.fs.reverse_sync(from_path=cluster_uuid)
301
302 async def reset(
303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
308 ) -> bool:
309 """Reset a cluster
310
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid, uninstall_sw
323 )
324 )
325
326 # sync local dir
327 self.fs.sync(from_path=cluster_uuid)
328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid)
357 self.log.warn(msg)
358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
361
362 if uninstall_sw:
363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
364
365 # delete cluster directory
366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
368 # Remove also local directorio if still exist
369 direct = self.fs.path + "/" + cluster_uuid
370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
374 def _is_helm_chart_a_file(self, chart_name: str):
375 return chart_name.count("/") > 1
376
377 async def _install_impl(
378 self,
379 cluster_id: str,
380 kdu_model: str,
381 paths: dict,
382 env: dict,
383 kdu_instance: str,
384 atomic: bool = True,
385 timeout: float = 300,
386 params: dict = None,
387 db_dict: dict = None,
388 kdu_name: str = None,
389 namespace: str = None,
390 ):
391 # init env, paths
392 paths, env = self._init_paths_env(
393 cluster_name=cluster_id, create_if_not_exist=True
394 )
395
396 # params to str
397 params_str, file_to_delete = self._params_to_file_option(
398 cluster_id=cluster_id, params=params
399 )
400
401 # version
402 kdu_model, version = self._split_version(kdu_model)
403
404 repo = self._split_repo(kdu_model)
405 if repo:
406 self.repo_update(cluster_id, repo)
407
408 command = self._get_install_command(
409 kdu_model,
410 kdu_instance,
411 namespace,
412 params_str,
413 version,
414 atomic,
415 timeout,
416 paths["kube_config"],
417 )
418
419 self.log.debug("installing: {}".format(command))
420
421 if atomic:
422 # exec helm in a task
423 exec_task = asyncio.ensure_future(
424 coro_or_future=self._local_async_exec(
425 command=command, raise_exception_on_error=False, env=env
426 )
427 )
428
429 # write status in another task
430 status_task = asyncio.ensure_future(
431 coro_or_future=self._store_status(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 namespace=namespace,
435 db_dict=db_dict,
436 operation="install",
437 )
438 )
439
440 # wait for execution task
441 await asyncio.wait([exec_task])
442
443 # cancel status task
444 status_task.cancel()
445
446 output, rc = exec_task.result()
447
448 else:
449
450 output, rc = await self._local_async_exec(
451 command=command, raise_exception_on_error=False, env=env
452 )
453
454 # remove temporal values yaml file
455 if file_to_delete:
456 os.remove(file_to_delete)
457
458 # write final status
459 await self._store_status(
460 cluster_id=cluster_id,
461 kdu_instance=kdu_instance,
462 namespace=namespace,
463 db_dict=db_dict,
464 operation="install",
465 )
466
467 if rc != 0:
468 msg = "Error executing command: {}\nOutput: {}".format(command, output)
469 self.log.error(msg)
470 raise K8sException(msg)
471
472 async def upgrade(
473 self,
474 cluster_uuid: str,
475 kdu_instance: str,
476 kdu_model: str = None,
477 atomic: bool = True,
478 timeout: float = 300,
479 params: dict = None,
480 db_dict: dict = None,
481 ):
482 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
483
484 # sync local dir
485 self.fs.sync(from_path=cluster_uuid)
486
487 # look for instance to obtain namespace
488 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
489 if not instance_info:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance))
491
492 # init env, paths
493 paths, env = self._init_paths_env(
494 cluster_name=cluster_uuid, create_if_not_exist=True
495 )
496
497 # sync local dir
498 self.fs.sync(from_path=cluster_uuid)
499
500 # params to str
501 params_str, file_to_delete = self._params_to_file_option(
502 cluster_id=cluster_uuid, params=params
503 )
504
505 # version
506 kdu_model, version = self._split_version(kdu_model)
507
508 repo = self._split_repo(kdu_model)
509 if repo:
510 self.repo_update(cluster_uuid, repo)
511
512 command = self._get_upgrade_command(
513 kdu_model,
514 kdu_instance,
515 instance_info["namespace"],
516 params_str,
517 version,
518 atomic,
519 timeout,
520 paths["kube_config"],
521 )
522
523 self.log.debug("upgrading: {}".format(command))
524
525 if atomic:
526
527 # exec helm in a task
528 exec_task = asyncio.ensure_future(
529 coro_or_future=self._local_async_exec(
530 command=command, raise_exception_on_error=False, env=env
531 )
532 )
533 # write status in another task
534 status_task = asyncio.ensure_future(
535 coro_or_future=self._store_status(
536 cluster_id=cluster_uuid,
537 kdu_instance=kdu_instance,
538 namespace=instance_info["namespace"],
539 db_dict=db_dict,
540 operation="upgrade",
541 )
542 )
543
544 # wait for execution task
545 await asyncio.wait([exec_task])
546
547 # cancel status task
548 status_task.cancel()
549 output, rc = exec_task.result()
550
551 else:
552
553 output, rc = await self._local_async_exec(
554 command=command, raise_exception_on_error=False, env=env
555 )
556
557 # remove temporal values yaml file
558 if file_to_delete:
559 os.remove(file_to_delete)
560
561 # write final status
562 await self._store_status(
563 cluster_id=cluster_uuid,
564 kdu_instance=kdu_instance,
565 namespace=instance_info["namespace"],
566 db_dict=db_dict,
567 operation="upgrade",
568 )
569
570 if rc != 0:
571 msg = "Error executing command: {}\nOutput: {}".format(command, output)
572 self.log.error(msg)
573 raise K8sException(msg)
574
575 # sync fs
576 self.fs.reverse_sync(from_path=cluster_uuid)
577
578 # return new revision number
579 instance = await self.get_instance_info(
580 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
581 )
582 if instance:
583 revision = int(instance.get("revision"))
584 self.log.debug("New revision: {}".format(revision))
585 return revision
586 else:
587 return 0
588
589 async def scale(
590 self,
591 kdu_instance: str,
592 scale: int,
593 resource_name: str,
594 total_timeout: float = 1800,
595 cluster_uuid: str = None,
596 kdu_model: str = None,
597 atomic: bool = True,
598 db_dict: dict = None,
599 **kwargs,
600 ):
601 """Scale a resource in a Helm Chart.
602
603 Args:
604 kdu_instance: KDU instance name
605 scale: Scale to which to set the resource
606 resource_name: Resource name
607 total_timeout: The time, in seconds, to wait
608 cluster_uuid: The UUID of the cluster
609 kdu_model: The chart reference
610 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
611 The --wait flag will be set automatically if --atomic is used
612 db_dict: Dictionary for any additional data
613 kwargs: Additional parameters
614
615 Returns:
616 True if successful, False otherwise
617 """
618
619 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
620 if resource_name:
621 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
622 resource_name, kdu_model, cluster_uuid
623 )
624
625 self.log.debug(debug_mgs)
626
627 # look for instance to obtain namespace
628 # get_instance_info function calls the sync command
629 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
630 if not instance_info:
631 raise K8sException("kdu_instance {} not found".format(kdu_instance))
632
633 # init env, paths
634 paths, env = self._init_paths_env(
635 cluster_name=cluster_uuid, create_if_not_exist=True
636 )
637
638 # version
639 kdu_model, version = self._split_version(kdu_model)
640
641 repo_url = await self._find_repo(kdu_model, cluster_uuid)
642
643 _, replica_str = await self._get_replica_count_url(
644 kdu_model, repo_url, resource_name
645 )
646
647 command = self._get_upgrade_scale_command(
648 kdu_model,
649 kdu_instance,
650 instance_info["namespace"],
651 scale,
652 version,
653 atomic,
654 replica_str,
655 total_timeout,
656 resource_name,
657 paths["kube_config"],
658 )
659
660 self.log.debug("scaling: {}".format(command))
661
662 if atomic:
663 # exec helm in a task
664 exec_task = asyncio.ensure_future(
665 coro_or_future=self._local_async_exec(
666 command=command, raise_exception_on_error=False, env=env
667 )
668 )
669 # write status in another task
670 status_task = asyncio.ensure_future(
671 coro_or_future=self._store_status(
672 cluster_id=cluster_uuid,
673 kdu_instance=kdu_instance,
674 namespace=instance_info["namespace"],
675 db_dict=db_dict,
676 operation="scale",
677 )
678 )
679
680 # wait for execution task
681 await asyncio.wait([exec_task])
682
683 # cancel status task
684 status_task.cancel()
685 output, rc = exec_task.result()
686
687 else:
688 output, rc = await self._local_async_exec(
689 command=command, raise_exception_on_error=False, env=env
690 )
691
692 # write final status
693 await self._store_status(
694 cluster_id=cluster_uuid,
695 kdu_instance=kdu_instance,
696 namespace=instance_info["namespace"],
697 db_dict=db_dict,
698 operation="scale",
699 )
700
701 if rc != 0:
702 msg = "Error executing command: {}\nOutput: {}".format(command, output)
703 self.log.error(msg)
704 raise K8sException(msg)
705
706 # sync fs
707 self.fs.reverse_sync(from_path=cluster_uuid)
708
709 return True
710
711 async def get_scale_count(
712 self,
713 resource_name: str,
714 kdu_instance: str,
715 cluster_uuid: str,
716 kdu_model: str,
717 **kwargs,
718 ) -> int:
719 """Get a resource scale count.
720
721 Args:
722 cluster_uuid: The UUID of the cluster
723 resource_name: Resource name
724 kdu_instance: KDU instance name
725 kdu_model: The name or path of an Helm Chart
726 kwargs: Additional parameters
727
728 Returns:
729 Resource instance count
730 """
731
732 self.log.debug(
733 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
734 )
735
736 # look for instance to obtain namespace
737 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
738 if not instance_info:
739 raise K8sException("kdu_instance {} not found".format(kdu_instance))
740
741 # init env, paths
742 paths, env = self._init_paths_env(
743 cluster_name=cluster_uuid, create_if_not_exist=True
744 )
745
746 replicas = await self._get_replica_count_instance(
747 kdu_instance=kdu_instance,
748 namespace=instance_info["namespace"],
749 kubeconfig=paths["kube_config"],
750 resource_name=resource_name,
751 )
752
753 # Get default value if scale count is not found from provided values
754 if not replicas:
755 repo_url = await self._find_repo(
756 kdu_model=kdu_model, cluster_uuid=cluster_uuid
757 )
758 replicas, _ = await self._get_replica_count_url(
759 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
760 )
761
762 if not replicas:
763 msg = "Replica count not found. Cannot be scaled"
764 self.log.error(msg)
765 raise K8sException(msg)
766
767 return int(replicas)
768
769 async def rollback(
770 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
771 ):
772 self.log.debug(
773 "rollback kdu_instance {} to revision {} from cluster {}".format(
774 kdu_instance, revision, cluster_uuid
775 )
776 )
777
778 # sync local dir
779 self.fs.sync(from_path=cluster_uuid)
780
781 # look for instance to obtain namespace
782 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
783 if not instance_info:
784 raise K8sException("kdu_instance {} not found".format(kdu_instance))
785
786 # init env, paths
787 paths, env = self._init_paths_env(
788 cluster_name=cluster_uuid, create_if_not_exist=True
789 )
790
791 # sync local dir
792 self.fs.sync(from_path=cluster_uuid)
793
794 command = self._get_rollback_command(
795 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
796 )
797
798 self.log.debug("rolling_back: {}".format(command))
799
800 # exec helm in a task
801 exec_task = asyncio.ensure_future(
802 coro_or_future=self._local_async_exec(
803 command=command, raise_exception_on_error=False, env=env
804 )
805 )
806 # write status in another task
807 status_task = asyncio.ensure_future(
808 coro_or_future=self._store_status(
809 cluster_id=cluster_uuid,
810 kdu_instance=kdu_instance,
811 namespace=instance_info["namespace"],
812 db_dict=db_dict,
813 operation="rollback",
814 )
815 )
816
817 # wait for execution task
818 await asyncio.wait([exec_task])
819
820 # cancel status task
821 status_task.cancel()
822
823 output, rc = exec_task.result()
824
825 # write final status
826 await self._store_status(
827 cluster_id=cluster_uuid,
828 kdu_instance=kdu_instance,
829 namespace=instance_info["namespace"],
830 db_dict=db_dict,
831 operation="rollback",
832 )
833
834 if rc != 0:
835 msg = "Error executing command: {}\nOutput: {}".format(command, output)
836 self.log.error(msg)
837 raise K8sException(msg)
838
839 # sync fs
840 self.fs.reverse_sync(from_path=cluster_uuid)
841
842 # return new revision number
843 instance = await self.get_instance_info(
844 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
845 )
846 if instance:
847 revision = int(instance.get("revision"))
848 self.log.debug("New revision: {}".format(revision))
849 return revision
850 else:
851 return 0
852
853 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
854 """
855 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
856 (this call should happen after all _terminate-config-primitive_ of the VNF
857 are invoked).
858
859 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
860 :param kdu_instance: unique name for the KDU instance to be deleted
861 :param kwargs: Additional parameters (None yet)
862 :return: True if successful
863 """
864
865 self.log.debug(
866 "uninstall kdu_instance {} from cluster {}".format(
867 kdu_instance, cluster_uuid
868 )
869 )
870
871 # sync local dir
872 self.fs.sync(from_path=cluster_uuid)
873
874 # look for instance to obtain namespace
875 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
876 if not instance_info:
877 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
878 return True
879 # init env, paths
880 paths, env = self._init_paths_env(
881 cluster_name=cluster_uuid, create_if_not_exist=True
882 )
883
884 # sync local dir
885 self.fs.sync(from_path=cluster_uuid)
886
887 command = self._get_uninstall_command(
888 kdu_instance, instance_info["namespace"], paths["kube_config"]
889 )
890 output, _rc = await self._local_async_exec(
891 command=command, raise_exception_on_error=True, env=env
892 )
893
894 # sync fs
895 self.fs.reverse_sync(from_path=cluster_uuid)
896
897 return self._output_to_table(output)
898
899 async def instances_list(self, cluster_uuid: str) -> list:
900 """
901 returns a list of deployed releases in a cluster
902
903 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
904 :return:
905 """
906
907 self.log.debug("list releases for cluster {}".format(cluster_uuid))
908
909 # sync local dir
910 self.fs.sync(from_path=cluster_uuid)
911
912 # execute internal command
913 result = await self._instances_list(cluster_uuid)
914
915 # sync fs
916 self.fs.reverse_sync(from_path=cluster_uuid)
917
918 return result
919
920 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
921 instances = await self.instances_list(cluster_uuid=cluster_uuid)
922 for instance in instances:
923 if instance.get("name") == kdu_instance:
924 return instance
925 self.log.debug("Instance {} not found".format(kdu_instance))
926 return None
927
928 async def upgrade_charm(
929 self,
930 ee_id: str = None,
931 path: str = None,
932 charm_id: str = None,
933 charm_type: str = None,
934 timeout: float = None,
935 ) -> str:
936 """This method upgrade charms in VNFs
937
938 Args:
939 ee_id: Execution environment id
940 path: Local path to the charm
941 charm_id: charm-id
942 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
943 timeout: (Float) Timeout for the ns update operation
944
945 Returns:
946 The output of the update operation if status equals to "completed"
947 """
948 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
949
950 async def exec_primitive(
951 self,
952 cluster_uuid: str = None,
953 kdu_instance: str = None,
954 primitive_name: str = None,
955 timeout: float = 300,
956 params: dict = None,
957 db_dict: dict = None,
958 **kwargs,
959 ) -> str:
960 """Exec primitive (Juju action)
961
962 :param cluster_uuid: The UUID of the cluster or namespace:cluster
963 :param kdu_instance: The unique name of the KDU instance
964 :param primitive_name: Name of action that will be executed
965 :param timeout: Timeout for action execution
966 :param params: Dictionary of all the parameters needed for the action
967 :db_dict: Dictionary for any additional data
968 :param kwargs: Additional parameters (None yet)
969
970 :return: Returns the output of the action
971 """
972 raise K8sException(
973 "KDUs deployed with Helm don't support actions "
974 "different from rollback, upgrade and status"
975 )
976
977 async def get_services(
978 self, cluster_uuid: str, kdu_instance: str, namespace: str
979 ) -> list:
980 """
981 Returns a list of services defined for the specified kdu instance.
982
983 :param cluster_uuid: UUID of a K8s cluster known by OSM
984 :param kdu_instance: unique name for the KDU instance
985 :param namespace: K8s namespace used by the KDU instance
986 :return: If successful, it will return a list of services, Each service
987 can have the following data:
988 - `name` of the service
989 - `type` type of service in the k8 cluster
990 - `ports` List of ports offered by the service, for each port includes at least
991 name, port, protocol
992 - `cluster_ip` Internal ip to be used inside k8s cluster
993 - `external_ip` List of external ips (in case they are available)
994 """
995
996 self.log.debug(
997 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
998 cluster_uuid, kdu_instance
999 )
1000 )
1001
1002 # init env, paths
1003 paths, env = self._init_paths_env(
1004 cluster_name=cluster_uuid, create_if_not_exist=True
1005 )
1006
1007 # sync local dir
1008 self.fs.sync(from_path=cluster_uuid)
1009
1010 # get list of services names for kdu
1011 service_names = await self._get_services(
1012 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1013 )
1014
1015 service_list = []
1016 for service in service_names:
1017 service = await self._get_service(cluster_uuid, service, namespace)
1018 service_list.append(service)
1019
1020 # sync fs
1021 self.fs.reverse_sync(from_path=cluster_uuid)
1022
1023 return service_list
1024
1025 async def get_service(
1026 self, cluster_uuid: str, service_name: str, namespace: str
1027 ) -> object:
1028
1029 self.log.debug(
1030 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1031 service_name, namespace, cluster_uuid
1032 )
1033 )
1034
1035 # sync local dir
1036 self.fs.sync(from_path=cluster_uuid)
1037
1038 service = await self._get_service(cluster_uuid, service_name, namespace)
1039
1040 # sync fs
1041 self.fs.reverse_sync(from_path=cluster_uuid)
1042
1043 return service
1044
1045 async def status_kdu(
1046 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1047 ) -> Union[str, dict]:
1048 """
1049 This call would retrieve tha current state of a given KDU instance. It would be
1050 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1051 values_ of the configuration parameters applied to a given instance. This call
1052 would be based on the `status` call.
1053
1054 :param cluster_uuid: UUID of a K8s cluster known by OSM
1055 :param kdu_instance: unique name for the KDU instance
1056 :param kwargs: Additional parameters (None yet)
1057 :param yaml_format: if the return shall be returned as an YAML string or as a
1058 dictionary
1059 :return: If successful, it will return the following vector of arguments:
1060 - K8s `namespace` in the cluster where the KDU lives
1061 - `state` of the KDU instance. It can be:
1062 - UNKNOWN
1063 - DEPLOYED
1064 - DELETED
1065 - SUPERSEDED
1066 - FAILED or
1067 - DELETING
1068 - List of `resources` (objects) that this release consists of, sorted by kind,
1069 and the status of those resources
1070 - Last `deployment_time`.
1071
1072 """
1073 self.log.debug(
1074 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1075 cluster_uuid, kdu_instance
1076 )
1077 )
1078
1079 # sync local dir
1080 self.fs.sync(from_path=cluster_uuid)
1081
1082 # get instance: needed to obtain namespace
1083 instances = await self._instances_list(cluster_id=cluster_uuid)
1084 for instance in instances:
1085 if instance.get("name") == kdu_instance:
1086 break
1087 else:
1088 # instance does not exist
1089 raise K8sException(
1090 "Instance name: {} not found in cluster: {}".format(
1091 kdu_instance, cluster_uuid
1092 )
1093 )
1094
1095 status = await self._status_kdu(
1096 cluster_id=cluster_uuid,
1097 kdu_instance=kdu_instance,
1098 namespace=instance["namespace"],
1099 yaml_format=yaml_format,
1100 show_error_log=True,
1101 )
1102
1103 # sync fs
1104 self.fs.reverse_sync(from_path=cluster_uuid)
1105
1106 return status
1107
1108 async def get_values_kdu(
1109 self, kdu_instance: str, namespace: str, kubeconfig: str
1110 ) -> str:
1111
1112 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1113
1114 return await self._exec_get_command(
1115 get_command="values",
1116 kdu_instance=kdu_instance,
1117 namespace=namespace,
1118 kubeconfig=kubeconfig,
1119 )
1120
1121 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1122 """Method to obtain the Helm Chart package's values
1123
1124 Args:
1125 kdu_model: The name or path of an Helm Chart
1126 repo_url: Helm Chart repository url
1127
1128 Returns:
1129 str: the values of the Helm Chart package
1130 """
1131
1132 self.log.debug(
1133 "inspect kdu_model values {} from (optional) repo: {}".format(
1134 kdu_model, repo_url
1135 )
1136 )
1137
1138 return await self._exec_inspect_command(
1139 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1140 )
1141
1142 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1143
1144 self.log.debug(
1145 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1146 )
1147
1148 return await self._exec_inspect_command(
1149 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1150 )
1151
1152 async def synchronize_repos(self, cluster_uuid: str):
1153
1154 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1155 try:
1156 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1157 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1158
1159 local_repo_list = await self.repo_list(cluster_uuid)
1160 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1161
1162 deleted_repo_list = []
1163 added_repo_dict = {}
1164
1165 # iterate over the list of repos in the database that should be
1166 # added if not present
1167 for repo_name, db_repo in db_repo_dict.items():
1168 try:
1169 # check if it is already present
1170 curr_repo_url = local_repo_dict.get(db_repo["name"])
1171 repo_id = db_repo.get("_id")
1172 if curr_repo_url != db_repo["url"]:
1173 if curr_repo_url:
1174 self.log.debug(
1175 "repo {} url changed, delete and and again".format(
1176 db_repo["url"]
1177 )
1178 )
1179 await self.repo_remove(cluster_uuid, db_repo["name"])
1180 deleted_repo_list.append(repo_id)
1181
1182 # add repo
1183 self.log.debug("add repo {}".format(db_repo["name"]))
1184 if "ca_cert" in db_repo:
1185 await self.repo_add(
1186 cluster_uuid,
1187 db_repo["name"],
1188 db_repo["url"],
1189 cert=db_repo["ca_cert"],
1190 )
1191 else:
1192 await self.repo_add(
1193 cluster_uuid,
1194 db_repo["name"],
1195 db_repo["url"],
1196 )
1197 added_repo_dict[repo_id] = db_repo["name"]
1198 except Exception as e:
1199 raise K8sException(
1200 "Error adding repo id: {}, err_msg: {} ".format(
1201 repo_id, repr(e)
1202 )
1203 )
1204
1205 # Delete repos that are present but not in nbi_list
1206 for repo_name in local_repo_dict:
1207 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1208 self.log.debug("delete repo {}".format(repo_name))
1209 try:
1210 await self.repo_remove(cluster_uuid, repo_name)
1211 deleted_repo_list.append(repo_name)
1212 except Exception as e:
1213 self.warning(
1214 "Error deleting repo, name: {}, err_msg: {}".format(
1215 repo_name, str(e)
1216 )
1217 )
1218
1219 return deleted_repo_list, added_repo_dict
1220
1221 except K8sException:
1222 raise
1223 except Exception as e:
1224 # Do not raise errors synchronizing repos
1225 self.log.error("Error synchronizing repos: {}".format(e))
1226 raise Exception("Error synchronizing repos: {}".format(e))
1227
1228 def _get_db_repos_dict(self, repo_ids: list):
1229 db_repos_dict = {}
1230 for repo_id in repo_ids:
1231 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1232 db_repos_dict[db_repo["name"]] = db_repo
1233 return db_repos_dict
1234
1235 """
1236 ####################################################################################
1237 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1238 ####################################################################################
1239 """
1240
1241 @abc.abstractmethod
1242 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1243 """
1244 Creates and returns base cluster and kube dirs and returns them.
1245 Also created helm3 dirs according to new directory specification, paths are
1246 not returned but assigned to helm environment variables
1247
1248 :param cluster_name: cluster_name
1249 :return: Dictionary with config_paths and dictionary with helm environment variables
1250 """
1251
1252 @abc.abstractmethod
1253 async def _cluster_init(self, cluster_id, namespace, paths, env):
1254 """
1255 Implements the helm version dependent cluster initialization
1256 """
1257
1258 @abc.abstractmethod
1259 async def _instances_list(self, cluster_id):
1260 """
1261 Implements the helm version dependent helm instances list
1262 """
1263
1264 @abc.abstractmethod
1265 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1266 """
1267 Implements the helm version dependent method to obtain services from a helm instance
1268 """
1269
1270 @abc.abstractmethod
1271 async def _status_kdu(
1272 self,
1273 cluster_id: str,
1274 kdu_instance: str,
1275 namespace: str = None,
1276 yaml_format: bool = False,
1277 show_error_log: bool = False,
1278 ) -> Union[str, dict]:
1279 """
1280 Implements the helm version dependent method to obtain status of a helm instance
1281 """
1282
1283 @abc.abstractmethod
1284 def _get_install_command(
1285 self,
1286 kdu_model,
1287 kdu_instance,
1288 namespace,
1289 params_str,
1290 version,
1291 atomic,
1292 timeout,
1293 kubeconfig,
1294 ) -> str:
1295 """
1296 Obtain command to be executed to delete the indicated instance
1297 """
1298
1299 @abc.abstractmethod
1300 def _get_upgrade_scale_command(
1301 self,
1302 kdu_model,
1303 kdu_instance,
1304 namespace,
1305 count,
1306 version,
1307 atomic,
1308 replicas,
1309 timeout,
1310 resource_name,
1311 kubeconfig,
1312 ) -> str:
1313 """Obtain command to be executed to upgrade the indicated instance."""
1314
1315 @abc.abstractmethod
1316 def _get_upgrade_command(
1317 self,
1318 kdu_model,
1319 kdu_instance,
1320 namespace,
1321 params_str,
1322 version,
1323 atomic,
1324 timeout,
1325 kubeconfig,
1326 ) -> str:
1327 """
1328 Obtain command to be executed to upgrade the indicated instance
1329 """
1330
1331 @abc.abstractmethod
1332 def _get_rollback_command(
1333 self, kdu_instance, namespace, revision, kubeconfig
1334 ) -> str:
1335 """
1336 Obtain command to be executed to rollback the indicated instance
1337 """
1338
1339 @abc.abstractmethod
1340 def _get_uninstall_command(
1341 self, kdu_instance: str, namespace: str, kubeconfig: str
1342 ) -> str:
1343 """
1344 Obtain command to be executed to delete the indicated instance
1345 """
1346
1347 @abc.abstractmethod
1348 def _get_inspect_command(
1349 self, show_command: str, kdu_model: str, repo_str: str, version: str
1350 ):
1351 """Generates the command to obtain the information about an Helm Chart package
1352 (´helm show ...´ command)
1353
1354 Args:
1355 show_command: the second part of the command (`helm show <show_command>`)
1356 kdu_model: The name or path of an Helm Chart
1357 repo_url: Helm Chart repository url
1358 version: constraint with specific version of the Chart to use
1359
1360 Returns:
1361 str: the generated Helm Chart command
1362 """
1363
1364 @abc.abstractmethod
1365 def _get_get_command(
1366 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1367 ):
1368 """Obtain command to be executed to get information about the kdu instance."""
1369
1370 @abc.abstractmethod
1371 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1372 """
1373 Method call to uninstall cluster software for helm. This method is dependent
1374 of helm version
1375 For Helm v2 it will be called when Tiller must be uninstalled
1376 For Helm v3 it does nothing and does not need to be callled
1377 """
1378
1379 @abc.abstractmethod
1380 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1381 """
1382 Obtains the cluster repos identifiers
1383 """
1384
1385 """
1386 ####################################################################################
1387 ################################### P R I V A T E ##################################
1388 ####################################################################################
1389 """
1390
1391 @staticmethod
1392 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1393 if os.path.exists(filename):
1394 return True
1395 else:
1396 msg = "File {} does not exist".format(filename)
1397 if exception_if_not_exists:
1398 raise K8sException(msg)
1399
1400 @staticmethod
1401 def _remove_multiple_spaces(strobj):
1402 strobj = strobj.strip()
1403 while " " in strobj:
1404 strobj = strobj.replace(" ", " ")
1405 return strobj
1406
1407 @staticmethod
1408 def _output_to_lines(output: str) -> list:
1409 output_lines = list()
1410 lines = output.splitlines(keepends=False)
1411 for line in lines:
1412 line = line.strip()
1413 if len(line) > 0:
1414 output_lines.append(line)
1415 return output_lines
1416
1417 @staticmethod
1418 def _output_to_table(output: str) -> list:
1419 output_table = list()
1420 lines = output.splitlines(keepends=False)
1421 for line in lines:
1422 line = line.replace("\t", " ")
1423 line_list = list()
1424 output_table.append(line_list)
1425 cells = line.split(sep=" ")
1426 for cell in cells:
1427 cell = cell.strip()
1428 if len(cell) > 0:
1429 line_list.append(cell)
1430 return output_table
1431
1432 @staticmethod
1433 def _parse_services(output: str) -> list:
1434 lines = output.splitlines(keepends=False)
1435 services = []
1436 for line in lines:
1437 line = line.replace("\t", " ")
1438 cells = line.split(sep=" ")
1439 if len(cells) > 0 and cells[0].startswith("service/"):
1440 elems = cells[0].split(sep="/")
1441 if len(elems) > 1:
1442 services.append(elems[1])
1443 return services
1444
1445 @staticmethod
1446 def _get_deep(dictionary: dict, members: tuple):
1447 target = dictionary
1448 value = None
1449 try:
1450 for m in members:
1451 value = target.get(m)
1452 if not value:
1453 return None
1454 else:
1455 target = value
1456 except Exception:
1457 pass
1458 return value
1459
1460 # find key:value in several lines
1461 @staticmethod
1462 def _find_in_lines(p_lines: list, p_key: str) -> str:
1463 for line in p_lines:
1464 try:
1465 if line.startswith(p_key + ":"):
1466 parts = line.split(":")
1467 the_value = parts[1].strip()
1468 return the_value
1469 except Exception:
1470 # ignore it
1471 pass
1472 return None
1473
1474 @staticmethod
1475 def _lower_keys_list(input_list: list):
1476 """
1477 Transform the keys in a list of dictionaries to lower case and returns a new list
1478 of dictionaries
1479 """
1480 new_list = []
1481 if input_list:
1482 for dictionary in input_list:
1483 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1484 new_list.append(new_dict)
1485 return new_list
1486
1487 async def _local_async_exec(
1488 self,
1489 command: str,
1490 raise_exception_on_error: bool = False,
1491 show_error_log: bool = True,
1492 encode_utf8: bool = False,
1493 env: dict = None,
1494 ) -> (str, int):
1495
1496 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1497 self.log.debug(
1498 "Executing async local command: {}, env: {}".format(command, env)
1499 )
1500
1501 # split command
1502 command = shlex.split(command)
1503
1504 environ = os.environ.copy()
1505 if env:
1506 environ.update(env)
1507
1508 try:
1509 process = await asyncio.create_subprocess_exec(
1510 *command,
1511 stdout=asyncio.subprocess.PIPE,
1512 stderr=asyncio.subprocess.PIPE,
1513 env=environ,
1514 )
1515
1516 # wait for command terminate
1517 stdout, stderr = await process.communicate()
1518
1519 return_code = process.returncode
1520
1521 output = ""
1522 if stdout:
1523 output = stdout.decode("utf-8").strip()
1524 # output = stdout.decode()
1525 if stderr:
1526 output = stderr.decode("utf-8").strip()
1527 # output = stderr.decode()
1528
1529 if return_code != 0 and show_error_log:
1530 self.log.debug(
1531 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1532 )
1533 else:
1534 self.log.debug("Return code: {}".format(return_code))
1535
1536 if raise_exception_on_error and return_code != 0:
1537 raise K8sException(output)
1538
1539 if encode_utf8:
1540 output = output.encode("utf-8").strip()
1541 output = str(output).replace("\\n", "\n")
1542
1543 return output, return_code
1544
1545 except asyncio.CancelledError:
1546 raise
1547 except K8sException:
1548 raise
1549 except Exception as e:
1550 msg = "Exception executing command: {} -> {}".format(command, e)
1551 self.log.error(msg)
1552 if raise_exception_on_error:
1553 raise K8sException(e) from e
1554 else:
1555 return "", -1
1556
1557 async def _local_async_exec_pipe(
1558 self,
1559 command1: str,
1560 command2: str,
1561 raise_exception_on_error: bool = True,
1562 show_error_log: bool = True,
1563 encode_utf8: bool = False,
1564 env: dict = None,
1565 ):
1566
1567 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1568 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1569 command = "{} | {}".format(command1, command2)
1570 self.log.debug(
1571 "Executing async local command: {}, env: {}".format(command, env)
1572 )
1573
1574 # split command
1575 command1 = shlex.split(command1)
1576 command2 = shlex.split(command2)
1577
1578 environ = os.environ.copy()
1579 if env:
1580 environ.update(env)
1581
1582 try:
1583 read, write = os.pipe()
1584 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1585 os.close(write)
1586 process_2 = await asyncio.create_subprocess_exec(
1587 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1588 )
1589 os.close(read)
1590 stdout, stderr = await process_2.communicate()
1591
1592 return_code = process_2.returncode
1593
1594 output = ""
1595 if stdout:
1596 output = stdout.decode("utf-8").strip()
1597 # output = stdout.decode()
1598 if stderr:
1599 output = stderr.decode("utf-8").strip()
1600 # output = stderr.decode()
1601
1602 if return_code != 0 and show_error_log:
1603 self.log.debug(
1604 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1605 )
1606 else:
1607 self.log.debug("Return code: {}".format(return_code))
1608
1609 if raise_exception_on_error and return_code != 0:
1610 raise K8sException(output)
1611
1612 if encode_utf8:
1613 output = output.encode("utf-8").strip()
1614 output = str(output).replace("\\n", "\n")
1615
1616 return output, return_code
1617 except asyncio.CancelledError:
1618 raise
1619 except K8sException:
1620 raise
1621 except Exception as e:
1622 msg = "Exception executing command: {} -> {}".format(command, e)
1623 self.log.error(msg)
1624 if raise_exception_on_error:
1625 raise K8sException(e) from e
1626 else:
1627 return "", -1
1628
1629 async def _get_service(self, cluster_id, service_name, namespace):
1630 """
1631 Obtains the data of the specified service in the k8cluster.
1632
1633 :param cluster_id: id of a K8s cluster known by OSM
1634 :param service_name: name of the K8s service in the specified namespace
1635 :param namespace: K8s namespace used by the KDU instance
1636 :return: If successful, it will return a service with the following data:
1637 - `name` of the service
1638 - `type` type of service in the k8 cluster
1639 - `ports` List of ports offered by the service, for each port includes at least
1640 name, port, protocol
1641 - `cluster_ip` Internal ip to be used inside k8s cluster
1642 - `external_ip` List of external ips (in case they are available)
1643 """
1644
1645 # init config, env
1646 paths, env = self._init_paths_env(
1647 cluster_name=cluster_id, create_if_not_exist=True
1648 )
1649
1650 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1651 self.kubectl_command, paths["kube_config"], namespace, service_name
1652 )
1653
1654 output, _rc = await self._local_async_exec(
1655 command=command, raise_exception_on_error=True, env=env
1656 )
1657
1658 data = yaml.load(output, Loader=yaml.SafeLoader)
1659
1660 service = {
1661 "name": service_name,
1662 "type": self._get_deep(data, ("spec", "type")),
1663 "ports": self._get_deep(data, ("spec", "ports")),
1664 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1665 }
1666 if service["type"] == "LoadBalancer":
1667 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1668 ip_list = [elem["ip"] for elem in ip_map_list]
1669 service["external_ip"] = ip_list
1670
1671 return service
1672
1673 async def _exec_get_command(
1674 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1675 ):
1676 """Obtains information about the kdu instance."""
1677
1678 full_command = self._get_get_command(
1679 get_command, kdu_instance, namespace, kubeconfig
1680 )
1681
1682 output, _rc = await self._local_async_exec(command=full_command)
1683
1684 return output
1685
1686 async def _exec_inspect_command(
1687 self, inspect_command: str, kdu_model: str, repo_url: str = None
1688 ):
1689 """Obtains information about an Helm Chart package (´helm show´ command)
1690
1691 Args:
1692 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1693 kdu_model: The name or path of an Helm Chart
1694 repo_url: Helm Chart repository url
1695
1696 Returns:
1697 str: the requested info about the Helm Chart package
1698 """
1699
1700 repo_str = ""
1701 if repo_url:
1702 repo_str = " --repo {}".format(repo_url)
1703
1704 idx = kdu_model.find("/")
1705 if idx >= 0:
1706 idx += 1
1707 kdu_model = kdu_model[idx:]
1708
1709 kdu_model, version = self._split_version(kdu_model)
1710 if version:
1711 version_str = "--version {}".format(version)
1712 else:
1713 version_str = ""
1714
1715 full_command = self._get_inspect_command(
1716 inspect_command, kdu_model, repo_str, version_str
1717 )
1718
1719 output, _rc = await self._local_async_exec(command=full_command)
1720
1721 return output
1722
1723 async def _get_replica_count_url(
1724 self,
1725 kdu_model: str,
1726 repo_url: str = None,
1727 resource_name: str = None,
1728 ):
1729 """Get the replica count value in the Helm Chart Values.
1730
1731 Args:
1732 kdu_model: The name or path of an Helm Chart
1733 repo_url: Helm Chart repository url
1734 resource_name: Resource name
1735
1736 Returns:
1737 True if replicas, False replicaCount
1738 """
1739
1740 kdu_values = yaml.load(
1741 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1742 Loader=yaml.SafeLoader,
1743 )
1744
1745 if not kdu_values:
1746 raise K8sException(
1747 "kdu_values not found for kdu_model {}".format(kdu_model)
1748 )
1749
1750 if resource_name:
1751 kdu_values = kdu_values.get(resource_name, None)
1752
1753 if not kdu_values:
1754 msg = "resource {} not found in the values in model {}".format(
1755 resource_name, kdu_model
1756 )
1757 self.log.error(msg)
1758 raise K8sException(msg)
1759
1760 duplicate_check = False
1761
1762 replica_str = ""
1763 replicas = None
1764
1765 if kdu_values.get("replicaCount", None):
1766 replicas = kdu_values["replicaCount"]
1767 replica_str = "replicaCount"
1768 elif kdu_values.get("replicas", None):
1769 duplicate_check = True
1770 replicas = kdu_values["replicas"]
1771 replica_str = "replicas"
1772 else:
1773 if resource_name:
1774 msg = (
1775 "replicaCount or replicas not found in the resource"
1776 "{} values in model {}. Cannot be scaled".format(
1777 resource_name, kdu_model
1778 )
1779 )
1780 else:
1781 msg = (
1782 "replicaCount or replicas not found in the values"
1783 "in model {}. Cannot be scaled".format(kdu_model)
1784 )
1785 self.log.error(msg)
1786 raise K8sException(msg)
1787
1788 # Control if replicas and replicaCount exists at the same time
1789 msg = "replicaCount and replicas are exists at the same time"
1790 if duplicate_check:
1791 if "replicaCount" in kdu_values:
1792 self.log.error(msg)
1793 raise K8sException(msg)
1794 else:
1795 if "replicas" in kdu_values:
1796 self.log.error(msg)
1797 raise K8sException(msg)
1798
1799 return replicas, replica_str
1800
1801 async def _get_replica_count_instance(
1802 self,
1803 kdu_instance: str,
1804 namespace: str,
1805 kubeconfig: str,
1806 resource_name: str = None,
1807 ):
1808 """Get the replica count value in the instance.
1809
1810 Args:
1811 kdu_instance: The name of the KDU instance
1812 namespace: KDU instance namespace
1813 kubeconfig:
1814 resource_name: Resource name
1815
1816 Returns:
1817 True if replicas, False replicaCount
1818 """
1819
1820 kdu_values = yaml.load(
1821 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1822 Loader=yaml.SafeLoader,
1823 )
1824
1825 replicas = None
1826
1827 if kdu_values:
1828 resource_values = (
1829 kdu_values.get(resource_name, None) if resource_name else None
1830 )
1831 replicas = (
1832 (
1833 resource_values.get("replicaCount", None)
1834 or resource_values.get("replicas", None)
1835 )
1836 if resource_values
1837 else (
1838 kdu_values.get("replicaCount", None)
1839 or kdu_values.get("replicas", None)
1840 )
1841 )
1842
1843 return replicas
1844
1845 async def _store_status(
1846 self,
1847 cluster_id: str,
1848 operation: str,
1849 kdu_instance: str,
1850 namespace: str = None,
1851 db_dict: dict = None,
1852 ) -> None:
1853 """
1854 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1855
1856 :param cluster_id (str): the cluster where the KDU instance is deployed
1857 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1858 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1859 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1860 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1861 values for the keys:
1862 - "collection": The Mongo DB collection to write to
1863 - "filter": The query filter to use in the update process
1864 - "path": The dot separated keys which targets the object to be updated
1865 Defaults to None.
1866 """
1867
1868 try:
1869 detailed_status = await self._status_kdu(
1870 cluster_id=cluster_id,
1871 kdu_instance=kdu_instance,
1872 yaml_format=False,
1873 namespace=namespace,
1874 )
1875
1876 status = detailed_status.get("info").get("description")
1877 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1878
1879 # write status to db
1880 result = await self.write_app_status_to_db(
1881 db_dict=db_dict,
1882 status=str(status),
1883 detailed_status=str(detailed_status),
1884 operation=operation,
1885 )
1886
1887 if not result:
1888 self.log.info("Error writing in database. Task exiting...")
1889
1890 except asyncio.CancelledError as e:
1891 self.log.warning(
1892 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1893 )
1894 except Exception as e:
1895 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1896
1897 # params for use in -f file
1898 # returns values file option and filename (in order to delete it at the end)
1899 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1900
1901 if params and len(params) > 0:
1902 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1903
1904 def get_random_number():
1905 r = random.randrange(start=1, stop=99999999)
1906 s = str(r)
1907 while len(s) < 10:
1908 s = "0" + s
1909 return s
1910
1911 params2 = dict()
1912 for key in params:
1913 value = params.get(key)
1914 if "!!yaml" in str(value):
1915 value = yaml.safe_load(value[7:])
1916 params2[key] = value
1917
1918 values_file = get_random_number() + ".yaml"
1919 with open(values_file, "w") as stream:
1920 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1921
1922 return "-f {}".format(values_file), values_file
1923
1924 return "", None
1925
1926 # params for use in --set option
1927 @staticmethod
1928 def _params_to_set_option(params: dict) -> str:
1929 params_str = ""
1930 if params and len(params) > 0:
1931 start = True
1932 for key in params:
1933 value = params.get(key, None)
1934 if value is not None:
1935 if start:
1936 params_str += "--set "
1937 start = False
1938 else:
1939 params_str += ","
1940 params_str += "{}={}".format(key, value)
1941 return params_str
1942
1943 @staticmethod
1944 def generate_kdu_instance_name(**kwargs):
1945 chart_name = kwargs["kdu_model"]
1946 # check embeded chart (file or dir)
1947 if chart_name.startswith("/"):
1948 # extract file or directory name
1949 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1950 # check URL
1951 elif "://" in chart_name:
1952 # extract last portion of URL
1953 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1954
1955 name = ""
1956 for c in chart_name:
1957 if c.isalpha() or c.isnumeric():
1958 name += c
1959 else:
1960 name += "-"
1961 if len(name) > 35:
1962 name = name[0:35]
1963
1964 # if does not start with alpha character, prefix 'a'
1965 if not name[0].isalpha():
1966 name = "a" + name
1967
1968 name += "-"
1969
1970 def get_random_number():
1971 r = random.randrange(start=1, stop=99999999)
1972 s = str(r)
1973 s = s.rjust(10, "0")
1974 return s
1975
1976 name = name + get_random_number()
1977 return name.lower()
1978
1979 def _split_version(self, kdu_model: str) -> (str, str):
1980 version = None
1981 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
1982 parts = kdu_model.split(sep=":")
1983 if len(parts) == 2:
1984 version = str(parts[1])
1985 kdu_model = parts[0]
1986 return kdu_model, version
1987
1988 async def _split_repo(self, kdu_model: str) -> str:
1989 repo_name = None
1990 idx = kdu_model.find("/")
1991 if idx >= 0:
1992 repo_name = kdu_model[:idx]
1993 return repo_name
1994
1995 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1996 """Obtain the Helm repository for an Helm Chart
1997
1998 Args:
1999 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2000 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2001
2002 Returns:
2003 str: the repository URL; if Helm Chart is a local one, the function returns None
2004 """
2005
2006 repo_url = None
2007 idx = kdu_model.find("/")
2008 if idx >= 0:
2009 repo_name = kdu_model[:idx]
2010 # Find repository link
2011 local_repo_list = await self.repo_list(cluster_uuid)
2012 for repo in local_repo_list:
2013 repo_url = repo["url"] if repo["name"] == repo_name else None
2014 return repo_url