Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
96 def _get_namespace(self, cluster_uuid: str) -> str:
97 """
98 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
101 """
102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
108
109 async def init_env(
110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
131 cluster_id = reuse_cluster_uuid
132 else:
133 cluster_id = str(uuid4())
134
135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
155 return cluster_id, n2vc_installed_sw
156
157 async def repo_add(
158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
166 ):
167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid, repo_type, name, url
170 )
171 )
172
173 # init_env
174 paths, env = self._init_paths_env(
175 cluster_name=cluster_uuid, create_if_not_exist=True
176 )
177
178 # sync local dir
179 self.fs.sync(from_path=cluster_uuid)
180
181 # helm repo add name url
182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths["kube_config"], self._helm_command, name, url
184 )
185
186 if cert:
187 temp_cert_file = os.path.join(
188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
201 self.log.debug("adding repo: {}".format(command))
202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
205
206 # helm repo update
207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
215 # sync fs
216 self.fs.reverse_sync(from_path=cluster_uuid)
217
218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
251
252 # config filename
253 paths, env = self._init_paths_env(
254 cluster_name=cluster_uuid, create_if_not_exist=True
255 )
256
257 # sync local dir
258 self.fs.sync(from_path=cluster_uuid)
259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
270 self.fs.reverse_sync(from_path=cluster_uuid)
271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
286
287 # init env, paths
288 paths, env = self._init_paths_env(
289 cluster_name=cluster_uuid, create_if_not_exist=True
290 )
291
292 # sync local dir
293 self.fs.sync(from_path=cluster_uuid)
294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
300 )
301
302 # sync fs
303 self.fs.reverse_sync(from_path=cluster_uuid)
304
305 async def reset(
306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
311 ) -> bool:
312 """Reset a cluster
313
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid, uninstall_sw
326 )
327 )
328
329 # sync local dir
330 self.fs.sync(from_path=cluster_uuid)
331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid)
360 self.log.warn(msg)
361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
364
365 if uninstall_sw:
366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
367
368 # delete cluster directory
369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
371 # Remove also local directorio if still exist
372 direct = self.fs.path + "/" + cluster_uuid
373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
380 async def _install_impl(
381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
393 ):
394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
405 kdu_model, version = self._split_version(kdu_model)
406
407 repo = self._split_repo(kdu_model)
408 if repo:
409 self.repo_update(cluster_id, repo)
410
411 command = self._get_install_command(
412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
420 )
421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
486
487 # sync local dir
488 self.fs.sync(from_path=cluster_uuid)
489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
497 cluster_name=cluster_uuid, create_if_not_exist=True
498 )
499
500 # sync local dir
501 self.fs.sync(from_path=cluster_uuid)
502
503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
505 cluster_id=cluster_uuid, params=params
506 )
507
508 # version
509 kdu_model, version = self._split_version(kdu_model)
510
511 repo = self._split_repo(kdu_model)
512 if repo:
513 self.repo_update(cluster_uuid, repo)
514
515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
523 paths["kube_config"],
524 )
525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
539 cluster_id=cluster_uuid,
540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
544 )
545 )
546
547 # wait for execution task
548 await asyncio.wait([exec_task])
549
550 # cancel status task
551 status_task.cancel()
552 output, rc = exec_task.result()
553
554 else:
555
556 output, rc = await self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559
560 # remove temporal values yaml file
561 if file_to_delete:
562 os.remove(file_to_delete)
563
564 # write final status
565 await self._store_status(
566 cluster_id=cluster_uuid,
567 kdu_instance=kdu_instance,
568 namespace=instance_info["namespace"],
569 db_dict=db_dict,
570 operation="upgrade",
571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
579 self.fs.reverse_sync(from_path=cluster_uuid)
580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
592 async def scale(
593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
602 **kwargs,
603 ):
604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
625 resource_name, kdu_model, cluster_uuid
626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
638 cluster_name=cluster_uuid, create_if_not_exist=True
639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
645
646 _, replica_str = await self._get_replica_count_url(
647 kdu_model, repo_url, resource_name
648 )
649
650 command = self._get_upgrade_scale_command(
651 kdu_model,
652 kdu_instance,
653 instance_info["namespace"],
654 scale,
655 version,
656 atomic,
657 replica_str,
658 total_timeout,
659 resource_name,
660 paths["kube_config"],
661 )
662
663 self.log.debug("scaling: {}".format(command))
664
665 if atomic:
666 # exec helm in a task
667 exec_task = asyncio.ensure_future(
668 coro_or_future=self._local_async_exec(
669 command=command, raise_exception_on_error=False, env=env
670 )
671 )
672 # write status in another task
673 status_task = asyncio.ensure_future(
674 coro_or_future=self._store_status(
675 cluster_id=cluster_uuid,
676 kdu_instance=kdu_instance,
677 namespace=instance_info["namespace"],
678 db_dict=db_dict,
679 operation="scale",
680 )
681 )
682
683 # wait for execution task
684 await asyncio.wait([exec_task])
685
686 # cancel status task
687 status_task.cancel()
688 output, rc = exec_task.result()
689
690 else:
691 output, rc = await self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694
695 # write final status
696 await self._store_status(
697 cluster_id=cluster_uuid,
698 kdu_instance=kdu_instance,
699 namespace=instance_info["namespace"],
700 db_dict=db_dict,
701 operation="scale",
702 )
703
704 if rc != 0:
705 msg = "Error executing command: {}\nOutput: {}".format(command, output)
706 self.log.error(msg)
707 raise K8sException(msg)
708
709 # sync fs
710 self.fs.reverse_sync(from_path=cluster_uuid)
711
712 return True
713
714 async def get_scale_count(
715 self,
716 resource_name: str,
717 kdu_instance: str,
718 cluster_uuid: str,
719 kdu_model: str,
720 **kwargs,
721 ) -> int:
722 """Get a resource scale count.
723
724 Args:
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
728 kdu_model: The name or path of an Helm Chart
729 kwargs: Additional parameters
730
731 Returns:
732 Resource instance count
733 """
734
735 self.log.debug(
736 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
737 )
738
739 # look for instance to obtain namespace
740 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
741 if not instance_info:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance))
743
744 # init env, paths
745 paths, env = self._init_paths_env(
746 cluster_name=cluster_uuid, create_if_not_exist=True
747 )
748
749 replicas = await self._get_replica_count_instance(
750 kdu_instance=kdu_instance,
751 namespace=instance_info["namespace"],
752 kubeconfig=paths["kube_config"],
753 resource_name=resource_name,
754 )
755
756 # Get default value if scale count is not found from provided values
757 if not replicas:
758 repo_url = await self._find_repo(
759 kdu_model=kdu_model, cluster_uuid=cluster_uuid
760 )
761 replicas, _ = await self._get_replica_count_url(
762 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
763 )
764
765 if not replicas:
766 msg = "Replica count not found. Cannot be scaled"
767 self.log.error(msg)
768 raise K8sException(msg)
769
770 return int(replicas)
771
772 async def rollback(
773 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
774 ):
775 self.log.debug(
776 "rollback kdu_instance {} to revision {} from cluster {}".format(
777 kdu_instance, revision, cluster_uuid
778 )
779 )
780
781 # sync local dir
782 self.fs.sync(from_path=cluster_uuid)
783
784 # look for instance to obtain namespace
785 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
786 if not instance_info:
787 raise K8sException("kdu_instance {} not found".format(kdu_instance))
788
789 # init env, paths
790 paths, env = self._init_paths_env(
791 cluster_name=cluster_uuid, create_if_not_exist=True
792 )
793
794 # sync local dir
795 self.fs.sync(from_path=cluster_uuid)
796
797 command = self._get_rollback_command(
798 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
799 )
800
801 self.log.debug("rolling_back: {}".format(command))
802
803 # exec helm in a task
804 exec_task = asyncio.ensure_future(
805 coro_or_future=self._local_async_exec(
806 command=command, raise_exception_on_error=False, env=env
807 )
808 )
809 # write status in another task
810 status_task = asyncio.ensure_future(
811 coro_or_future=self._store_status(
812 cluster_id=cluster_uuid,
813 kdu_instance=kdu_instance,
814 namespace=instance_info["namespace"],
815 db_dict=db_dict,
816 operation="rollback",
817 )
818 )
819
820 # wait for execution task
821 await asyncio.wait([exec_task])
822
823 # cancel status task
824 status_task.cancel()
825
826 output, rc = exec_task.result()
827
828 # write final status
829 await self._store_status(
830 cluster_id=cluster_uuid,
831 kdu_instance=kdu_instance,
832 namespace=instance_info["namespace"],
833 db_dict=db_dict,
834 operation="rollback",
835 )
836
837 if rc != 0:
838 msg = "Error executing command: {}\nOutput: {}".format(command, output)
839 self.log.error(msg)
840 raise K8sException(msg)
841
842 # sync fs
843 self.fs.reverse_sync(from_path=cluster_uuid)
844
845 # return new revision number
846 instance = await self.get_instance_info(
847 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
848 )
849 if instance:
850 revision = int(instance.get("revision"))
851 self.log.debug("New revision: {}".format(revision))
852 return revision
853 else:
854 return 0
855
856 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
857 """
858 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
859 (this call should happen after all _terminate-config-primitive_ of the VNF
860 are invoked).
861
862 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
863 :param kdu_instance: unique name for the KDU instance to be deleted
864 :param kwargs: Additional parameters (None yet)
865 :return: True if successful
866 """
867
868 self.log.debug(
869 "uninstall kdu_instance {} from cluster {}".format(
870 kdu_instance, cluster_uuid
871 )
872 )
873
874 # sync local dir
875 self.fs.sync(from_path=cluster_uuid)
876
877 # look for instance to obtain namespace
878 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
879 if not instance_info:
880 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
881 return True
882 # init env, paths
883 paths, env = self._init_paths_env(
884 cluster_name=cluster_uuid, create_if_not_exist=True
885 )
886
887 # sync local dir
888 self.fs.sync(from_path=cluster_uuid)
889
890 command = self._get_uninstall_command(
891 kdu_instance, instance_info["namespace"], paths["kube_config"]
892 )
893 output, _rc = await self._local_async_exec(
894 command=command, raise_exception_on_error=True, env=env
895 )
896
897 # sync fs
898 self.fs.reverse_sync(from_path=cluster_uuid)
899
900 return self._output_to_table(output)
901
902 async def instances_list(self, cluster_uuid: str) -> list:
903 """
904 returns a list of deployed releases in a cluster
905
906 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
907 :return:
908 """
909
910 self.log.debug("list releases for cluster {}".format(cluster_uuid))
911
912 # sync local dir
913 self.fs.sync(from_path=cluster_uuid)
914
915 # execute internal command
916 result = await self._instances_list(cluster_uuid)
917
918 # sync fs
919 self.fs.reverse_sync(from_path=cluster_uuid)
920
921 return result
922
923 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
924 instances = await self.instances_list(cluster_uuid=cluster_uuid)
925 for instance in instances:
926 if instance.get("name") == kdu_instance:
927 return instance
928 self.log.debug("Instance {} not found".format(kdu_instance))
929 return None
930
931 async def upgrade_charm(
932 self,
933 ee_id: str = None,
934 path: str = None,
935 charm_id: str = None,
936 charm_type: str = None,
937 timeout: float = None,
938 ) -> str:
939 """This method upgrade charms in VNFs
940
941 Args:
942 ee_id: Execution environment id
943 path: Local path to the charm
944 charm_id: charm-id
945 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
946 timeout: (Float) Timeout for the ns update operation
947
948 Returns:
949 The output of the update operation if status equals to "completed"
950 """
951 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
952
953 async def exec_primitive(
954 self,
955 cluster_uuid: str = None,
956 kdu_instance: str = None,
957 primitive_name: str = None,
958 timeout: float = 300,
959 params: dict = None,
960 db_dict: dict = None,
961 **kwargs,
962 ) -> str:
963 """Exec primitive (Juju action)
964
965 :param cluster_uuid: The UUID of the cluster or namespace:cluster
966 :param kdu_instance: The unique name of the KDU instance
967 :param primitive_name: Name of action that will be executed
968 :param timeout: Timeout for action execution
969 :param params: Dictionary of all the parameters needed for the action
970 :db_dict: Dictionary for any additional data
971 :param kwargs: Additional parameters (None yet)
972
973 :return: Returns the output of the action
974 """
975 raise K8sException(
976 "KDUs deployed with Helm don't support actions "
977 "different from rollback, upgrade and status"
978 )
979
980 async def get_services(
981 self, cluster_uuid: str, kdu_instance: str, namespace: str
982 ) -> list:
983 """
984 Returns a list of services defined for the specified kdu instance.
985
986 :param cluster_uuid: UUID of a K8s cluster known by OSM
987 :param kdu_instance: unique name for the KDU instance
988 :param namespace: K8s namespace used by the KDU instance
989 :return: If successful, it will return a list of services, Each service
990 can have the following data:
991 - `name` of the service
992 - `type` type of service in the k8 cluster
993 - `ports` List of ports offered by the service, for each port includes at least
994 name, port, protocol
995 - `cluster_ip` Internal ip to be used inside k8s cluster
996 - `external_ip` List of external ips (in case they are available)
997 """
998
999 self.log.debug(
1000 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1001 cluster_uuid, kdu_instance
1002 )
1003 )
1004
1005 # init env, paths
1006 paths, env = self._init_paths_env(
1007 cluster_name=cluster_uuid, create_if_not_exist=True
1008 )
1009
1010 # sync local dir
1011 self.fs.sync(from_path=cluster_uuid)
1012
1013 # get list of services names for kdu
1014 service_names = await self._get_services(
1015 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1016 )
1017
1018 service_list = []
1019 for service in service_names:
1020 service = await self._get_service(cluster_uuid, service, namespace)
1021 service_list.append(service)
1022
1023 # sync fs
1024 self.fs.reverse_sync(from_path=cluster_uuid)
1025
1026 return service_list
1027
1028 async def get_service(
1029 self, cluster_uuid: str, service_name: str, namespace: str
1030 ) -> object:
1031
1032 self.log.debug(
1033 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1034 service_name, namespace, cluster_uuid
1035 )
1036 )
1037
1038 # sync local dir
1039 self.fs.sync(from_path=cluster_uuid)
1040
1041 service = await self._get_service(cluster_uuid, service_name, namespace)
1042
1043 # sync fs
1044 self.fs.reverse_sync(from_path=cluster_uuid)
1045
1046 return service
1047
1048 async def status_kdu(
1049 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1050 ) -> Union[str, dict]:
1051 """
1052 This call would retrieve tha current state of a given KDU instance. It would be
1053 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1054 values_ of the configuration parameters applied to a given instance. This call
1055 would be based on the `status` call.
1056
1057 :param cluster_uuid: UUID of a K8s cluster known by OSM
1058 :param kdu_instance: unique name for the KDU instance
1059 :param kwargs: Additional parameters (None yet)
1060 :param yaml_format: if the return shall be returned as an YAML string or as a
1061 dictionary
1062 :return: If successful, it will return the following vector of arguments:
1063 - K8s `namespace` in the cluster where the KDU lives
1064 - `state` of the KDU instance. It can be:
1065 - UNKNOWN
1066 - DEPLOYED
1067 - DELETED
1068 - SUPERSEDED
1069 - FAILED or
1070 - DELETING
1071 - List of `resources` (objects) that this release consists of, sorted by kind,
1072 and the status of those resources
1073 - Last `deployment_time`.
1074
1075 """
1076 self.log.debug(
1077 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1078 cluster_uuid, kdu_instance
1079 )
1080 )
1081
1082 # sync local dir
1083 self.fs.sync(from_path=cluster_uuid)
1084
1085 # get instance: needed to obtain namespace
1086 instances = await self._instances_list(cluster_id=cluster_uuid)
1087 for instance in instances:
1088 if instance.get("name") == kdu_instance:
1089 break
1090 else:
1091 # instance does not exist
1092 raise K8sException(
1093 "Instance name: {} not found in cluster: {}".format(
1094 kdu_instance, cluster_uuid
1095 )
1096 )
1097
1098 status = await self._status_kdu(
1099 cluster_id=cluster_uuid,
1100 kdu_instance=kdu_instance,
1101 namespace=instance["namespace"],
1102 yaml_format=yaml_format,
1103 show_error_log=True,
1104 )
1105
1106 # sync fs
1107 self.fs.reverse_sync(from_path=cluster_uuid)
1108
1109 return status
1110
1111 async def get_values_kdu(
1112 self, kdu_instance: str, namespace: str, kubeconfig: str
1113 ) -> str:
1114
1115 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1116
1117 return await self._exec_get_command(
1118 get_command="values",
1119 kdu_instance=kdu_instance,
1120 namespace=namespace,
1121 kubeconfig=kubeconfig,
1122 )
1123
1124 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1125 """Method to obtain the Helm Chart package's values
1126
1127 Args:
1128 kdu_model: The name or path of an Helm Chart
1129 repo_url: Helm Chart repository url
1130
1131 Returns:
1132 str: the values of the Helm Chart package
1133 """
1134
1135 self.log.debug(
1136 "inspect kdu_model values {} from (optional) repo: {}".format(
1137 kdu_model, repo_url
1138 )
1139 )
1140
1141 return await self._exec_inspect_command(
1142 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1143 )
1144
1145 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1146
1147 self.log.debug(
1148 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1149 )
1150
1151 return await self._exec_inspect_command(
1152 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1153 )
1154
1155 async def synchronize_repos(self, cluster_uuid: str):
1156
1157 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1158 try:
1159 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1160 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1161
1162 local_repo_list = await self.repo_list(cluster_uuid)
1163 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1164
1165 deleted_repo_list = []
1166 added_repo_dict = {}
1167
1168 # iterate over the list of repos in the database that should be
1169 # added if not present
1170 for repo_name, db_repo in db_repo_dict.items():
1171 try:
1172 # check if it is already present
1173 curr_repo_url = local_repo_dict.get(db_repo["name"])
1174 repo_id = db_repo.get("_id")
1175 if curr_repo_url != db_repo["url"]:
1176 if curr_repo_url:
1177 self.log.debug(
1178 "repo {} url changed, delete and and again".format(
1179 db_repo["url"]
1180 )
1181 )
1182 await self.repo_remove(cluster_uuid, db_repo["name"])
1183 deleted_repo_list.append(repo_id)
1184
1185 # add repo
1186 self.log.debug("add repo {}".format(db_repo["name"]))
1187 if "ca_cert" in db_repo:
1188 await self.repo_add(
1189 cluster_uuid,
1190 db_repo["name"],
1191 db_repo["url"],
1192 cert=db_repo["ca_cert"],
1193 )
1194 else:
1195 await self.repo_add(
1196 cluster_uuid,
1197 db_repo["name"],
1198 db_repo["url"],
1199 )
1200 added_repo_dict[repo_id] = db_repo["name"]
1201 except Exception as e:
1202 raise K8sException(
1203 "Error adding repo id: {}, err_msg: {} ".format(
1204 repo_id, repr(e)
1205 )
1206 )
1207
1208 # Delete repos that are present but not in nbi_list
1209 for repo_name in local_repo_dict:
1210 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1211 self.log.debug("delete repo {}".format(repo_name))
1212 try:
1213 await self.repo_remove(cluster_uuid, repo_name)
1214 deleted_repo_list.append(repo_name)
1215 except Exception as e:
1216 self.warning(
1217 "Error deleting repo, name: {}, err_msg: {}".format(
1218 repo_name, str(e)
1219 )
1220 )
1221
1222 return deleted_repo_list, added_repo_dict
1223
1224 except K8sException:
1225 raise
1226 except Exception as e:
1227 # Do not raise errors synchronizing repos
1228 self.log.error("Error synchronizing repos: {}".format(e))
1229 raise Exception("Error synchronizing repos: {}".format(e))
1230
1231 def _get_db_repos_dict(self, repo_ids: list):
1232 db_repos_dict = {}
1233 for repo_id in repo_ids:
1234 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1235 db_repos_dict[db_repo["name"]] = db_repo
1236 return db_repos_dict
1237
1238 """
1239 ####################################################################################
1240 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1241 ####################################################################################
1242 """
1243
1244 @abc.abstractmethod
1245 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1246 """
1247 Creates and returns base cluster and kube dirs and returns them.
1248 Also created helm3 dirs according to new directory specification, paths are
1249 not returned but assigned to helm environment variables
1250
1251 :param cluster_name: cluster_name
1252 :return: Dictionary with config_paths and dictionary with helm environment variables
1253 """
1254
1255 @abc.abstractmethod
1256 async def _cluster_init(self, cluster_id, namespace, paths, env):
1257 """
1258 Implements the helm version dependent cluster initialization
1259 """
1260
1261 @abc.abstractmethod
1262 async def _instances_list(self, cluster_id):
1263 """
1264 Implements the helm version dependent helm instances list
1265 """
1266
1267 @abc.abstractmethod
1268 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1269 """
1270 Implements the helm version dependent method to obtain services from a helm instance
1271 """
1272
1273 @abc.abstractmethod
1274 async def _status_kdu(
1275 self,
1276 cluster_id: str,
1277 kdu_instance: str,
1278 namespace: str = None,
1279 yaml_format: bool = False,
1280 show_error_log: bool = False,
1281 ) -> Union[str, dict]:
1282 """
1283 Implements the helm version dependent method to obtain status of a helm instance
1284 """
1285
1286 @abc.abstractmethod
1287 def _get_install_command(
1288 self,
1289 kdu_model,
1290 kdu_instance,
1291 namespace,
1292 params_str,
1293 version,
1294 atomic,
1295 timeout,
1296 kubeconfig,
1297 ) -> str:
1298 """
1299 Obtain command to be executed to delete the indicated instance
1300 """
1301
1302 @abc.abstractmethod
1303 def _get_upgrade_scale_command(
1304 self,
1305 kdu_model,
1306 kdu_instance,
1307 namespace,
1308 count,
1309 version,
1310 atomic,
1311 replicas,
1312 timeout,
1313 resource_name,
1314 kubeconfig,
1315 ) -> str:
1316 """Obtain command to be executed to upgrade the indicated instance."""
1317
1318 @abc.abstractmethod
1319 def _get_upgrade_command(
1320 self,
1321 kdu_model,
1322 kdu_instance,
1323 namespace,
1324 params_str,
1325 version,
1326 atomic,
1327 timeout,
1328 kubeconfig,
1329 ) -> str:
1330 """
1331 Obtain command to be executed to upgrade the indicated instance
1332 """
1333
1334 @abc.abstractmethod
1335 def _get_rollback_command(
1336 self, kdu_instance, namespace, revision, kubeconfig
1337 ) -> str:
1338 """
1339 Obtain command to be executed to rollback the indicated instance
1340 """
1341
1342 @abc.abstractmethod
1343 def _get_uninstall_command(
1344 self, kdu_instance: str, namespace: str, kubeconfig: str
1345 ) -> str:
1346 """
1347 Obtain command to be executed to delete the indicated instance
1348 """
1349
1350 @abc.abstractmethod
1351 def _get_inspect_command(
1352 self, show_command: str, kdu_model: str, repo_str: str, version: str
1353 ):
1354 """Generates the command to obtain the information about an Helm Chart package
1355 (´helm show ...´ command)
1356
1357 Args:
1358 show_command: the second part of the command (`helm show <show_command>`)
1359 kdu_model: The name or path of an Helm Chart
1360 repo_url: Helm Chart repository url
1361 version: constraint with specific version of the Chart to use
1362
1363 Returns:
1364 str: the generated Helm Chart command
1365 """
1366
1367 @abc.abstractmethod
1368 def _get_get_command(
1369 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1370 ):
1371 """Obtain command to be executed to get information about the kdu instance."""
1372
1373 @abc.abstractmethod
1374 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1375 """
1376 Method call to uninstall cluster software for helm. This method is dependent
1377 of helm version
1378 For Helm v2 it will be called when Tiller must be uninstalled
1379 For Helm v3 it does nothing and does not need to be callled
1380 """
1381
1382 @abc.abstractmethod
1383 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1384 """
1385 Obtains the cluster repos identifiers
1386 """
1387
1388 """
1389 ####################################################################################
1390 ################################### P R I V A T E ##################################
1391 ####################################################################################
1392 """
1393
1394 @staticmethod
1395 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1396 if os.path.exists(filename):
1397 return True
1398 else:
1399 msg = "File {} does not exist".format(filename)
1400 if exception_if_not_exists:
1401 raise K8sException(msg)
1402
1403 @staticmethod
1404 def _remove_multiple_spaces(strobj):
1405 strobj = strobj.strip()
1406 while " " in strobj:
1407 strobj = strobj.replace(" ", " ")
1408 return strobj
1409
1410 @staticmethod
1411 def _output_to_lines(output: str) -> list:
1412 output_lines = list()
1413 lines = output.splitlines(keepends=False)
1414 for line in lines:
1415 line = line.strip()
1416 if len(line) > 0:
1417 output_lines.append(line)
1418 return output_lines
1419
1420 @staticmethod
1421 def _output_to_table(output: str) -> list:
1422 output_table = list()
1423 lines = output.splitlines(keepends=False)
1424 for line in lines:
1425 line = line.replace("\t", " ")
1426 line_list = list()
1427 output_table.append(line_list)
1428 cells = line.split(sep=" ")
1429 for cell in cells:
1430 cell = cell.strip()
1431 if len(cell) > 0:
1432 line_list.append(cell)
1433 return output_table
1434
1435 @staticmethod
1436 def _parse_services(output: str) -> list:
1437 lines = output.splitlines(keepends=False)
1438 services = []
1439 for line in lines:
1440 line = line.replace("\t", " ")
1441 cells = line.split(sep=" ")
1442 if len(cells) > 0 and cells[0].startswith("service/"):
1443 elems = cells[0].split(sep="/")
1444 if len(elems) > 1:
1445 services.append(elems[1])
1446 return services
1447
1448 @staticmethod
1449 def _get_deep(dictionary: dict, members: tuple):
1450 target = dictionary
1451 value = None
1452 try:
1453 for m in members:
1454 value = target.get(m)
1455 if not value:
1456 return None
1457 else:
1458 target = value
1459 except Exception:
1460 pass
1461 return value
1462
1463 # find key:value in several lines
1464 @staticmethod
1465 def _find_in_lines(p_lines: list, p_key: str) -> str:
1466 for line in p_lines:
1467 try:
1468 if line.startswith(p_key + ":"):
1469 parts = line.split(":")
1470 the_value = parts[1].strip()
1471 return the_value
1472 except Exception:
1473 # ignore it
1474 pass
1475 return None
1476
1477 @staticmethod
1478 def _lower_keys_list(input_list: list):
1479 """
1480 Transform the keys in a list of dictionaries to lower case and returns a new list
1481 of dictionaries
1482 """
1483 new_list = []
1484 if input_list:
1485 for dictionary in input_list:
1486 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1487 new_list.append(new_dict)
1488 return new_list
1489
1490 async def _local_async_exec(
1491 self,
1492 command: str,
1493 raise_exception_on_error: bool = False,
1494 show_error_log: bool = True,
1495 encode_utf8: bool = False,
1496 env: dict = None,
1497 ) -> (str, int):
1498
1499 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1500 self.log.debug(
1501 "Executing async local command: {}, env: {}".format(command, env)
1502 )
1503
1504 # split command
1505 command = shlex.split(command)
1506
1507 environ = os.environ.copy()
1508 if env:
1509 environ.update(env)
1510
1511 try:
1512 async with self.cmd_lock:
1513 process = await asyncio.create_subprocess_exec(
1514 *command,
1515 stdout=asyncio.subprocess.PIPE,
1516 stderr=asyncio.subprocess.PIPE,
1517 env=environ,
1518 )
1519
1520 # wait for command terminate
1521 stdout, stderr = await process.communicate()
1522
1523 return_code = process.returncode
1524
1525 output = ""
1526 if stdout:
1527 output = stdout.decode("utf-8").strip()
1528 # output = stdout.decode()
1529 if stderr:
1530 output = stderr.decode("utf-8").strip()
1531 # output = stderr.decode()
1532
1533 if return_code != 0 and show_error_log:
1534 self.log.debug(
1535 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1536 )
1537 else:
1538 self.log.debug("Return code: {}".format(return_code))
1539
1540 if raise_exception_on_error and return_code != 0:
1541 raise K8sException(output)
1542
1543 if encode_utf8:
1544 output = output.encode("utf-8").strip()
1545 output = str(output).replace("\\n", "\n")
1546
1547 return output, return_code
1548
1549 except asyncio.CancelledError:
1550 raise
1551 except K8sException:
1552 raise
1553 except Exception as e:
1554 msg = "Exception executing command: {} -> {}".format(command, e)
1555 self.log.error(msg)
1556 if raise_exception_on_error:
1557 raise K8sException(e) from e
1558 else:
1559 return "", -1
1560
1561 async def _local_async_exec_pipe(
1562 self,
1563 command1: str,
1564 command2: str,
1565 raise_exception_on_error: bool = True,
1566 show_error_log: bool = True,
1567 encode_utf8: bool = False,
1568 env: dict = None,
1569 ):
1570
1571 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1572 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1573 command = "{} | {}".format(command1, command2)
1574 self.log.debug(
1575 "Executing async local command: {}, env: {}".format(command, env)
1576 )
1577
1578 # split command
1579 command1 = shlex.split(command1)
1580 command2 = shlex.split(command2)
1581
1582 environ = os.environ.copy()
1583 if env:
1584 environ.update(env)
1585
1586 try:
1587 async with self.cmd_lock:
1588 read, write = os.pipe()
1589 await asyncio.create_subprocess_exec(
1590 *command1, stdout=write, env=environ
1591 )
1592 os.close(write)
1593 process_2 = await asyncio.create_subprocess_exec(
1594 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1595 )
1596 os.close(read)
1597 stdout, stderr = await process_2.communicate()
1598
1599 return_code = process_2.returncode
1600
1601 output = ""
1602 if stdout:
1603 output = stdout.decode("utf-8").strip()
1604 # output = stdout.decode()
1605 if stderr:
1606 output = stderr.decode("utf-8").strip()
1607 # output = stderr.decode()
1608
1609 if return_code != 0 and show_error_log:
1610 self.log.debug(
1611 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1612 )
1613 else:
1614 self.log.debug("Return code: {}".format(return_code))
1615
1616 if raise_exception_on_error and return_code != 0:
1617 raise K8sException(output)
1618
1619 if encode_utf8:
1620 output = output.encode("utf-8").strip()
1621 output = str(output).replace("\\n", "\n")
1622
1623 return output, return_code
1624 except asyncio.CancelledError:
1625 raise
1626 except K8sException:
1627 raise
1628 except Exception as e:
1629 msg = "Exception executing command: {} -> {}".format(command, e)
1630 self.log.error(msg)
1631 if raise_exception_on_error:
1632 raise K8sException(e) from e
1633 else:
1634 return "", -1
1635
1636 async def _get_service(self, cluster_id, service_name, namespace):
1637 """
1638 Obtains the data of the specified service in the k8cluster.
1639
1640 :param cluster_id: id of a K8s cluster known by OSM
1641 :param service_name: name of the K8s service in the specified namespace
1642 :param namespace: K8s namespace used by the KDU instance
1643 :return: If successful, it will return a service with the following data:
1644 - `name` of the service
1645 - `type` type of service in the k8 cluster
1646 - `ports` List of ports offered by the service, for each port includes at least
1647 name, port, protocol
1648 - `cluster_ip` Internal ip to be used inside k8s cluster
1649 - `external_ip` List of external ips (in case they are available)
1650 """
1651
1652 # init config, env
1653 paths, env = self._init_paths_env(
1654 cluster_name=cluster_id, create_if_not_exist=True
1655 )
1656
1657 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1658 self.kubectl_command, paths["kube_config"], namespace, service_name
1659 )
1660
1661 output, _rc = await self._local_async_exec(
1662 command=command, raise_exception_on_error=True, env=env
1663 )
1664
1665 data = yaml.load(output, Loader=yaml.SafeLoader)
1666
1667 service = {
1668 "name": service_name,
1669 "type": self._get_deep(data, ("spec", "type")),
1670 "ports": self._get_deep(data, ("spec", "ports")),
1671 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1672 }
1673 if service["type"] == "LoadBalancer":
1674 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1675 ip_list = [elem["ip"] for elem in ip_map_list]
1676 service["external_ip"] = ip_list
1677
1678 return service
1679
1680 async def _exec_get_command(
1681 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1682 ):
1683 """Obtains information about the kdu instance."""
1684
1685 full_command = self._get_get_command(
1686 get_command, kdu_instance, namespace, kubeconfig
1687 )
1688
1689 output, _rc = await self._local_async_exec(command=full_command)
1690
1691 return output
1692
1693 async def _exec_inspect_command(
1694 self, inspect_command: str, kdu_model: str, repo_url: str = None
1695 ):
1696 """Obtains information about an Helm Chart package (´helm show´ command)
1697
1698 Args:
1699 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1700 kdu_model: The name or path of an Helm Chart
1701 repo_url: Helm Chart repository url
1702
1703 Returns:
1704 str: the requested info about the Helm Chart package
1705 """
1706
1707 repo_str = ""
1708 if repo_url:
1709 repo_str = " --repo {}".format(repo_url)
1710
1711 idx = kdu_model.find("/")
1712 if idx >= 0:
1713 idx += 1
1714 kdu_model = kdu_model[idx:]
1715
1716 kdu_model, version = self._split_version(kdu_model)
1717 if version:
1718 version_str = "--version {}".format(version)
1719 else:
1720 version_str = ""
1721
1722 full_command = self._get_inspect_command(
1723 inspect_command, kdu_model, repo_str, version_str
1724 )
1725
1726 output, _rc = await self._local_async_exec(command=full_command)
1727
1728 return output
1729
1730 async def _get_replica_count_url(
1731 self,
1732 kdu_model: str,
1733 repo_url: str = None,
1734 resource_name: str = None,
1735 ):
1736 """Get the replica count value in the Helm Chart Values.
1737
1738 Args:
1739 kdu_model: The name or path of an Helm Chart
1740 repo_url: Helm Chart repository url
1741 resource_name: Resource name
1742
1743 Returns:
1744 True if replicas, False replicaCount
1745 """
1746
1747 kdu_values = yaml.load(
1748 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1749 Loader=yaml.SafeLoader,
1750 )
1751
1752 if not kdu_values:
1753 raise K8sException(
1754 "kdu_values not found for kdu_model {}".format(kdu_model)
1755 )
1756
1757 if resource_name:
1758 kdu_values = kdu_values.get(resource_name, None)
1759
1760 if not kdu_values:
1761 msg = "resource {} not found in the values in model {}".format(
1762 resource_name, kdu_model
1763 )
1764 self.log.error(msg)
1765 raise K8sException(msg)
1766
1767 duplicate_check = False
1768
1769 replica_str = ""
1770 replicas = None
1771
1772 if kdu_values.get("replicaCount", None):
1773 replicas = kdu_values["replicaCount"]
1774 replica_str = "replicaCount"
1775 elif kdu_values.get("replicas", None):
1776 duplicate_check = True
1777 replicas = kdu_values["replicas"]
1778 replica_str = "replicas"
1779 else:
1780 if resource_name:
1781 msg = (
1782 "replicaCount or replicas not found in the resource"
1783 "{} values in model {}. Cannot be scaled".format(
1784 resource_name, kdu_model
1785 )
1786 )
1787 else:
1788 msg = (
1789 "replicaCount or replicas not found in the values"
1790 "in model {}. Cannot be scaled".format(kdu_model)
1791 )
1792 self.log.error(msg)
1793 raise K8sException(msg)
1794
1795 # Control if replicas and replicaCount exists at the same time
1796 msg = "replicaCount and replicas are exists at the same time"
1797 if duplicate_check:
1798 if "replicaCount" in kdu_values:
1799 self.log.error(msg)
1800 raise K8sException(msg)
1801 else:
1802 if "replicas" in kdu_values:
1803 self.log.error(msg)
1804 raise K8sException(msg)
1805
1806 return replicas, replica_str
1807
1808 async def _get_replica_count_instance(
1809 self,
1810 kdu_instance: str,
1811 namespace: str,
1812 kubeconfig: str,
1813 resource_name: str = None,
1814 ):
1815 """Get the replica count value in the instance.
1816
1817 Args:
1818 kdu_instance: The name of the KDU instance
1819 namespace: KDU instance namespace
1820 kubeconfig:
1821 resource_name: Resource name
1822
1823 Returns:
1824 True if replicas, False replicaCount
1825 """
1826
1827 kdu_values = yaml.load(
1828 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1829 Loader=yaml.SafeLoader,
1830 )
1831
1832 replicas = None
1833
1834 if kdu_values:
1835 resource_values = (
1836 kdu_values.get(resource_name, None) if resource_name else None
1837 )
1838 replicas = (
1839 (
1840 resource_values.get("replicaCount", None)
1841 or resource_values.get("replicas", None)
1842 )
1843 if resource_values
1844 else (
1845 kdu_values.get("replicaCount", None)
1846 or kdu_values.get("replicas", None)
1847 )
1848 )
1849
1850 return replicas
1851
1852 async def _store_status(
1853 self,
1854 cluster_id: str,
1855 operation: str,
1856 kdu_instance: str,
1857 namespace: str = None,
1858 db_dict: dict = None,
1859 ) -> None:
1860 """
1861 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1862
1863 :param cluster_id (str): the cluster where the KDU instance is deployed
1864 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1865 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1866 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1867 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1868 values for the keys:
1869 - "collection": The Mongo DB collection to write to
1870 - "filter": The query filter to use in the update process
1871 - "path": The dot separated keys which targets the object to be updated
1872 Defaults to None.
1873 """
1874
1875 try:
1876 detailed_status = await self._status_kdu(
1877 cluster_id=cluster_id,
1878 kdu_instance=kdu_instance,
1879 yaml_format=False,
1880 namespace=namespace,
1881 )
1882
1883 status = detailed_status.get("info").get("description")
1884 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1885
1886 # write status to db
1887 result = await self.write_app_status_to_db(
1888 db_dict=db_dict,
1889 status=str(status),
1890 detailed_status=str(detailed_status),
1891 operation=operation,
1892 )
1893
1894 if not result:
1895 self.log.info("Error writing in database. Task exiting...")
1896
1897 except asyncio.CancelledError as e:
1898 self.log.warning(
1899 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1900 )
1901 except Exception as e:
1902 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1903
1904 # params for use in -f file
1905 # returns values file option and filename (in order to delete it at the end)
1906 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1907
1908 if params and len(params) > 0:
1909 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1910
1911 def get_random_number():
1912 r = random.randrange(start=1, stop=99999999)
1913 s = str(r)
1914 while len(s) < 10:
1915 s = "0" + s
1916 return s
1917
1918 params2 = dict()
1919 for key in params:
1920 value = params.get(key)
1921 if "!!yaml" in str(value):
1922 value = yaml.safe_load(value[7:])
1923 params2[key] = value
1924
1925 values_file = get_random_number() + ".yaml"
1926 with open(values_file, "w") as stream:
1927 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1928
1929 return "-f {}".format(values_file), values_file
1930
1931 return "", None
1932
1933 # params for use in --set option
1934 @staticmethod
1935 def _params_to_set_option(params: dict) -> str:
1936 params_str = ""
1937 if params and len(params) > 0:
1938 start = True
1939 for key in params:
1940 value = params.get(key, None)
1941 if value is not None:
1942 if start:
1943 params_str += "--set "
1944 start = False
1945 else:
1946 params_str += ","
1947 params_str += "{}={}".format(key, value)
1948 return params_str
1949
1950 @staticmethod
1951 def generate_kdu_instance_name(**kwargs):
1952 chart_name = kwargs["kdu_model"]
1953 # check embeded chart (file or dir)
1954 if chart_name.startswith("/"):
1955 # extract file or directory name
1956 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1957 # check URL
1958 elif "://" in chart_name:
1959 # extract last portion of URL
1960 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1961
1962 name = ""
1963 for c in chart_name:
1964 if c.isalpha() or c.isnumeric():
1965 name += c
1966 else:
1967 name += "-"
1968 if len(name) > 35:
1969 name = name[0:35]
1970
1971 # if does not start with alpha character, prefix 'a'
1972 if not name[0].isalpha():
1973 name = "a" + name
1974
1975 name += "-"
1976
1977 def get_random_number():
1978 r = random.randrange(start=1, stop=99999999)
1979 s = str(r)
1980 s = s.rjust(10, "0")
1981 return s
1982
1983 name = name + get_random_number()
1984 return name.lower()
1985
1986 def _split_version(self, kdu_model: str) -> (str, str):
1987 version = None
1988 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
1989 parts = kdu_model.split(sep=":")
1990 if len(parts) == 2:
1991 version = str(parts[1])
1992 kdu_model = parts[0]
1993 return kdu_model, version
1994
1995 async def _split_repo(self, kdu_model: str) -> str:
1996 repo_name = None
1997 idx = kdu_model.find("/")
1998 if idx >= 0:
1999 repo_name = kdu_model[:idx]
2000 return repo_name
2001
2002 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2003 """Obtain the Helm repository for an Helm Chart
2004
2005 Args:
2006 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2007 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2008
2009 Returns:
2010 str: the repository URL; if Helm Chart is a local one, the function returns None
2011 """
2012
2013 repo_url = None
2014 idx = kdu_model.find("/")
2015 if idx >= 0:
2016 repo_name = kdu_model[:idx]
2017 # Find repository link
2018 local_repo_list = await self.repo_list(cluster_uuid)
2019 for repo in local_repo_list:
2020 repo_url = repo["url"] if repo["name"] == repo_name else None
2021 return repo_url