Update the repo for a helm KDU before install and upgrade
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
163 ):
164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid, repo_type, name, url
167 )
168 )
169
170 # init_env
171 paths, env = self._init_paths_env(
172 cluster_name=cluster_uuid, create_if_not_exist=True
173 )
174
175 # sync local dir
176 self.fs.sync(from_path=cluster_uuid)
177
178 # helm repo add name url
179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182
183 if cert:
184 temp_cert_file = os.path.join(
185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
198 self.log.debug("adding repo: {}".format(command))
199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
202
203 # helm repo update
204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_uuid)
214
215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
248
249 # config filename
250 paths, env = self._init_paths_env(
251 cluster_name=cluster_uuid, create_if_not_exist=True
252 )
253
254 # sync local dir
255 self.fs.sync(from_path=cluster_uuid)
256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
267 self.fs.reverse_sync(from_path=cluster_uuid)
268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
283
284 # init env, paths
285 paths, env = self._init_paths_env(
286 cluster_name=cluster_uuid, create_if_not_exist=True
287 )
288
289 # sync local dir
290 self.fs.sync(from_path=cluster_uuid)
291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
297 )
298
299 # sync fs
300 self.fs.reverse_sync(from_path=cluster_uuid)
301
302 async def reset(
303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
308 ) -> bool:
309 """Reset a cluster
310
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid, uninstall_sw
323 )
324 )
325
326 # sync local dir
327 self.fs.sync(from_path=cluster_uuid)
328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid)
357 self.log.warn(msg)
358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
361
362 if uninstall_sw:
363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
364
365 # delete cluster directory
366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
368 # Remove also local directorio if still exist
369 direct = self.fs.path + "/" + cluster_uuid
370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
374 async def _install_impl(
375 self,
376 cluster_id: str,
377 kdu_model: str,
378 paths: dict,
379 env: dict,
380 kdu_instance: str,
381 atomic: bool = True,
382 timeout: float = 300,
383 params: dict = None,
384 db_dict: dict = None,
385 kdu_name: str = None,
386 namespace: str = None,
387 ):
388 # init env, paths
389 paths, env = self._init_paths_env(
390 cluster_name=cluster_id, create_if_not_exist=True
391 )
392
393 # params to str
394 params_str, file_to_delete = self._params_to_file_option(
395 cluster_id=cluster_id, params=params
396 )
397
398 # version
399 kdu_model, version = self._split_version(kdu_model)
400
401 repo = self._split_repo(kdu_model)
402 if repo:
403 self.repo_update(cluster_id, repo)
404
405 command = self._get_install_command(
406 kdu_model,
407 kdu_instance,
408 namespace,
409 params_str,
410 version,
411 atomic,
412 timeout,
413 paths["kube_config"],
414 )
415
416 self.log.debug("installing: {}".format(command))
417
418 if atomic:
419 # exec helm in a task
420 exec_task = asyncio.ensure_future(
421 coro_or_future=self._local_async_exec(
422 command=command, raise_exception_on_error=False, env=env
423 )
424 )
425
426 # write status in another task
427 status_task = asyncio.ensure_future(
428 coro_or_future=self._store_status(
429 cluster_id=cluster_id,
430 kdu_instance=kdu_instance,
431 namespace=namespace,
432 db_dict=db_dict,
433 operation="install",
434 run_once=False,
435 )
436 )
437
438 # wait for execution task
439 await asyncio.wait([exec_task])
440
441 # cancel status task
442 status_task.cancel()
443
444 output, rc = exec_task.result()
445
446 else:
447
448 output, rc = await self._local_async_exec(
449 command=command, raise_exception_on_error=False, env=env
450 )
451
452 # remove temporal values yaml file
453 if file_to_delete:
454 os.remove(file_to_delete)
455
456 # write final status
457 await self._store_status(
458 cluster_id=cluster_id,
459 kdu_instance=kdu_instance,
460 namespace=namespace,
461 db_dict=db_dict,
462 operation="install",
463 run_once=True,
464 check_every=0,
465 )
466
467 if rc != 0:
468 msg = "Error executing command: {}\nOutput: {}".format(command, output)
469 self.log.error(msg)
470 raise K8sException(msg)
471
472 async def upgrade(
473 self,
474 cluster_uuid: str,
475 kdu_instance: str,
476 kdu_model: str = None,
477 atomic: bool = True,
478 timeout: float = 300,
479 params: dict = None,
480 db_dict: dict = None,
481 ):
482 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
483
484 # sync local dir
485 self.fs.sync(from_path=cluster_uuid)
486
487 # look for instance to obtain namespace
488 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
489 if not instance_info:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance))
491
492 # init env, paths
493 paths, env = self._init_paths_env(
494 cluster_name=cluster_uuid, create_if_not_exist=True
495 )
496
497 # sync local dir
498 self.fs.sync(from_path=cluster_uuid)
499
500 # params to str
501 params_str, file_to_delete = self._params_to_file_option(
502 cluster_id=cluster_uuid, params=params
503 )
504
505 # version
506 kdu_model, version = self._split_version(kdu_model)
507
508 repo = self._split_repo(kdu_model)
509 if repo:
510 self.repo_update(cluster_uuid, repo)
511
512 command = self._get_upgrade_command(
513 kdu_model,
514 kdu_instance,
515 instance_info["namespace"],
516 params_str,
517 version,
518 atomic,
519 timeout,
520 paths["kube_config"],
521 )
522
523 self.log.debug("upgrading: {}".format(command))
524
525 if atomic:
526
527 # exec helm in a task
528 exec_task = asyncio.ensure_future(
529 coro_or_future=self._local_async_exec(
530 command=command, raise_exception_on_error=False, env=env
531 )
532 )
533 # write status in another task
534 status_task = asyncio.ensure_future(
535 coro_or_future=self._store_status(
536 cluster_id=cluster_uuid,
537 kdu_instance=kdu_instance,
538 namespace=instance_info["namespace"],
539 db_dict=db_dict,
540 operation="upgrade",
541 run_once=False,
542 )
543 )
544
545 # wait for execution task
546 await asyncio.wait([exec_task])
547
548 # cancel status task
549 status_task.cancel()
550 output, rc = exec_task.result()
551
552 else:
553
554 output, rc = await self._local_async_exec(
555 command=command, raise_exception_on_error=False, env=env
556 )
557
558 # remove temporal values yaml file
559 if file_to_delete:
560 os.remove(file_to_delete)
561
562 # write final status
563 await self._store_status(
564 cluster_id=cluster_uuid,
565 kdu_instance=kdu_instance,
566 namespace=instance_info["namespace"],
567 db_dict=db_dict,
568 operation="upgrade",
569 run_once=True,
570 check_every=0,
571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
579 self.fs.reverse_sync(from_path=cluster_uuid)
580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
592 async def scale(
593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
602 **kwargs,
603 ):
604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
625 resource_name, kdu_model, cluster_uuid
626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
638 cluster_name=cluster_uuid, create_if_not_exist=True
639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
645 if not repo_url:
646 raise K8sException(
647 "Repository not found for kdu_model {}".format(kdu_model)
648 )
649
650 _, replica_str = await self._get_replica_count_url(
651 kdu_model, repo_url, resource_name
652 )
653
654 command = self._get_upgrade_scale_command(
655 kdu_model,
656 kdu_instance,
657 instance_info["namespace"],
658 scale,
659 version,
660 atomic,
661 replica_str,
662 total_timeout,
663 resource_name,
664 paths["kube_config"],
665 )
666
667 self.log.debug("scaling: {}".format(command))
668
669 if atomic:
670 # exec helm in a task
671 exec_task = asyncio.ensure_future(
672 coro_or_future=self._local_async_exec(
673 command=command, raise_exception_on_error=False, env=env
674 )
675 )
676 # write status in another task
677 status_task = asyncio.ensure_future(
678 coro_or_future=self._store_status(
679 cluster_id=cluster_uuid,
680 kdu_instance=kdu_instance,
681 namespace=instance_info["namespace"],
682 db_dict=db_dict,
683 operation="scale",
684 run_once=False,
685 )
686 )
687
688 # wait for execution task
689 await asyncio.wait([exec_task])
690
691 # cancel status task
692 status_task.cancel()
693 output, rc = exec_task.result()
694
695 else:
696 output, rc = await self._local_async_exec(
697 command=command, raise_exception_on_error=False, env=env
698 )
699
700 # write final status
701 await self._store_status(
702 cluster_id=cluster_uuid,
703 kdu_instance=kdu_instance,
704 namespace=instance_info["namespace"],
705 db_dict=db_dict,
706 operation="scale",
707 run_once=True,
708 check_every=0,
709 )
710
711 if rc != 0:
712 msg = "Error executing command: {}\nOutput: {}".format(command, output)
713 self.log.error(msg)
714 raise K8sException(msg)
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_uuid)
718
719 return True
720
721 async def get_scale_count(
722 self,
723 resource_name: str,
724 kdu_instance: str,
725 cluster_uuid: str,
726 kdu_model: str,
727 **kwargs,
728 ) -> int:
729 """Get a resource scale count.
730
731 Args:
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of a bundle
736 kwargs: Additional parameters
737
738 Returns:
739 Resource instance count
740 """
741
742 self.log.debug(
743 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
744 )
745
746 # look for instance to obtain namespace
747 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
748 if not instance_info:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance))
750
751 # init env, paths
752 paths, env = self._init_paths_env(
753 cluster_name=cluster_uuid, create_if_not_exist=True
754 )
755
756 replicas = await self._get_replica_count_instance(
757 kdu_instance, instance_info["namespace"], paths["kube_config"]
758 )
759
760 # Get default value if scale count is not found from provided values
761 if not replicas:
762 repo_url = await self._find_repo(kdu_model, cluster_uuid)
763 if not repo_url:
764 raise K8sException(
765 "Repository not found for kdu_model {}".format(kdu_model)
766 )
767
768 replicas, _ = await self._get_replica_count_url(
769 kdu_model, repo_url, resource_name
770 )
771
772 if not replicas:
773 msg = "Replica count not found. Cannot be scaled"
774 self.log.error(msg)
775 raise K8sException(msg)
776
777 return int(replicas)
778
779 async def rollback(
780 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
781 ):
782 self.log.debug(
783 "rollback kdu_instance {} to revision {} from cluster {}".format(
784 kdu_instance, revision, cluster_uuid
785 )
786 )
787
788 # sync local dir
789 self.fs.sync(from_path=cluster_uuid)
790
791 # look for instance to obtain namespace
792 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
793 if not instance_info:
794 raise K8sException("kdu_instance {} not found".format(kdu_instance))
795
796 # init env, paths
797 paths, env = self._init_paths_env(
798 cluster_name=cluster_uuid, create_if_not_exist=True
799 )
800
801 # sync local dir
802 self.fs.sync(from_path=cluster_uuid)
803
804 command = self._get_rollback_command(
805 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
806 )
807
808 self.log.debug("rolling_back: {}".format(command))
809
810 # exec helm in a task
811 exec_task = asyncio.ensure_future(
812 coro_or_future=self._local_async_exec(
813 command=command, raise_exception_on_error=False, env=env
814 )
815 )
816 # write status in another task
817 status_task = asyncio.ensure_future(
818 coro_or_future=self._store_status(
819 cluster_id=cluster_uuid,
820 kdu_instance=kdu_instance,
821 namespace=instance_info["namespace"],
822 db_dict=db_dict,
823 operation="rollback",
824 run_once=False,
825 )
826 )
827
828 # wait for execution task
829 await asyncio.wait([exec_task])
830
831 # cancel status task
832 status_task.cancel()
833
834 output, rc = exec_task.result()
835
836 # write final status
837 await self._store_status(
838 cluster_id=cluster_uuid,
839 kdu_instance=kdu_instance,
840 namespace=instance_info["namespace"],
841 db_dict=db_dict,
842 operation="rollback",
843 run_once=True,
844 check_every=0,
845 )
846
847 if rc != 0:
848 msg = "Error executing command: {}\nOutput: {}".format(command, output)
849 self.log.error(msg)
850 raise K8sException(msg)
851
852 # sync fs
853 self.fs.reverse_sync(from_path=cluster_uuid)
854
855 # return new revision number
856 instance = await self.get_instance_info(
857 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
858 )
859 if instance:
860 revision = int(instance.get("revision"))
861 self.log.debug("New revision: {}".format(revision))
862 return revision
863 else:
864 return 0
865
866 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
867 """
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
870 are invoked).
871
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
876 """
877
878 self.log.debug(
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance, cluster_uuid
881 )
882 )
883
884 # sync local dir
885 self.fs.sync(from_path=cluster_uuid)
886
887 # look for instance to obtain namespace
888 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
889 if not instance_info:
890 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
891 return True
892 # init env, paths
893 paths, env = self._init_paths_env(
894 cluster_name=cluster_uuid, create_if_not_exist=True
895 )
896
897 # sync local dir
898 self.fs.sync(from_path=cluster_uuid)
899
900 command = self._get_uninstall_command(
901 kdu_instance, instance_info["namespace"], paths["kube_config"]
902 )
903 output, _rc = await self._local_async_exec(
904 command=command, raise_exception_on_error=True, env=env
905 )
906
907 # sync fs
908 self.fs.reverse_sync(from_path=cluster_uuid)
909
910 return self._output_to_table(output)
911
912 async def instances_list(self, cluster_uuid: str) -> list:
913 """
914 returns a list of deployed releases in a cluster
915
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
917 :return:
918 """
919
920 self.log.debug("list releases for cluster {}".format(cluster_uuid))
921
922 # sync local dir
923 self.fs.sync(from_path=cluster_uuid)
924
925 # execute internal command
926 result = await self._instances_list(cluster_uuid)
927
928 # sync fs
929 self.fs.reverse_sync(from_path=cluster_uuid)
930
931 return result
932
933 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
934 instances = await self.instances_list(cluster_uuid=cluster_uuid)
935 for instance in instances:
936 if instance.get("name") == kdu_instance:
937 return instance
938 self.log.debug("Instance {} not found".format(kdu_instance))
939 return None
940
941 async def exec_primitive(
942 self,
943 cluster_uuid: str = None,
944 kdu_instance: str = None,
945 primitive_name: str = None,
946 timeout: float = 300,
947 params: dict = None,
948 db_dict: dict = None,
949 **kwargs,
950 ) -> str:
951 """Exec primitive (Juju action)
952
953 :param cluster_uuid: The UUID of the cluster or namespace:cluster
954 :param kdu_instance: The unique name of the KDU instance
955 :param primitive_name: Name of action that will be executed
956 :param timeout: Timeout for action execution
957 :param params: Dictionary of all the parameters needed for the action
958 :db_dict: Dictionary for any additional data
959 :param kwargs: Additional parameters (None yet)
960
961 :return: Returns the output of the action
962 """
963 raise K8sException(
964 "KDUs deployed with Helm don't support actions "
965 "different from rollback, upgrade and status"
966 )
967
968 async def get_services(
969 self, cluster_uuid: str, kdu_instance: str, namespace: str
970 ) -> list:
971 """
972 Returns a list of services defined for the specified kdu instance.
973
974 :param cluster_uuid: UUID of a K8s cluster known by OSM
975 :param kdu_instance: unique name for the KDU instance
976 :param namespace: K8s namespace used by the KDU instance
977 :return: If successful, it will return a list of services, Each service
978 can have the following data:
979 - `name` of the service
980 - `type` type of service in the k8 cluster
981 - `ports` List of ports offered by the service, for each port includes at least
982 name, port, protocol
983 - `cluster_ip` Internal ip to be used inside k8s cluster
984 - `external_ip` List of external ips (in case they are available)
985 """
986
987 self.log.debug(
988 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
989 cluster_uuid, kdu_instance
990 )
991 )
992
993 # init env, paths
994 paths, env = self._init_paths_env(
995 cluster_name=cluster_uuid, create_if_not_exist=True
996 )
997
998 # sync local dir
999 self.fs.sync(from_path=cluster_uuid)
1000
1001 # get list of services names for kdu
1002 service_names = await self._get_services(
1003 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1004 )
1005
1006 service_list = []
1007 for service in service_names:
1008 service = await self._get_service(cluster_uuid, service, namespace)
1009 service_list.append(service)
1010
1011 # sync fs
1012 self.fs.reverse_sync(from_path=cluster_uuid)
1013
1014 return service_list
1015
1016 async def get_service(
1017 self, cluster_uuid: str, service_name: str, namespace: str
1018 ) -> object:
1019
1020 self.log.debug(
1021 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1022 service_name, namespace, cluster_uuid
1023 )
1024 )
1025
1026 # sync local dir
1027 self.fs.sync(from_path=cluster_uuid)
1028
1029 service = await self._get_service(cluster_uuid, service_name, namespace)
1030
1031 # sync fs
1032 self.fs.reverse_sync(from_path=cluster_uuid)
1033
1034 return service
1035
1036 async def status_kdu(
1037 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1038 ) -> Union[str, dict]:
1039 """
1040 This call would retrieve tha current state of a given KDU instance. It would be
1041 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1042 values_ of the configuration parameters applied to a given instance. This call
1043 would be based on the `status` call.
1044
1045 :param cluster_uuid: UUID of a K8s cluster known by OSM
1046 :param kdu_instance: unique name for the KDU instance
1047 :param kwargs: Additional parameters (None yet)
1048 :param yaml_format: if the return shall be returned as an YAML string or as a
1049 dictionary
1050 :return: If successful, it will return the following vector of arguments:
1051 - K8s `namespace` in the cluster where the KDU lives
1052 - `state` of the KDU instance. It can be:
1053 - UNKNOWN
1054 - DEPLOYED
1055 - DELETED
1056 - SUPERSEDED
1057 - FAILED or
1058 - DELETING
1059 - List of `resources` (objects) that this release consists of, sorted by kind,
1060 and the status of those resources
1061 - Last `deployment_time`.
1062
1063 """
1064 self.log.debug(
1065 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1066 cluster_uuid, kdu_instance
1067 )
1068 )
1069
1070 # sync local dir
1071 self.fs.sync(from_path=cluster_uuid)
1072
1073 # get instance: needed to obtain namespace
1074 instances = await self._instances_list(cluster_id=cluster_uuid)
1075 for instance in instances:
1076 if instance.get("name") == kdu_instance:
1077 break
1078 else:
1079 # instance does not exist
1080 raise K8sException(
1081 "Instance name: {} not found in cluster: {}".format(
1082 kdu_instance, cluster_uuid
1083 )
1084 )
1085
1086 status = await self._status_kdu(
1087 cluster_id=cluster_uuid,
1088 kdu_instance=kdu_instance,
1089 namespace=instance["namespace"],
1090 yaml_format=yaml_format,
1091 show_error_log=True,
1092 )
1093
1094 # sync fs
1095 self.fs.reverse_sync(from_path=cluster_uuid)
1096
1097 return status
1098
1099 async def get_values_kdu(
1100 self, kdu_instance: str, namespace: str, kubeconfig: str
1101 ) -> str:
1102
1103 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1104
1105 return await self._exec_get_command(
1106 get_command="values",
1107 kdu_instance=kdu_instance,
1108 namespace=namespace,
1109 kubeconfig=kubeconfig,
1110 )
1111
1112 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1113
1114 self.log.debug(
1115 "inspect kdu_model values {} from (optional) repo: {}".format(
1116 kdu_model, repo_url
1117 )
1118 )
1119
1120 return await self._exec_inspect_command(
1121 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1122 )
1123
1124 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1125
1126 self.log.debug(
1127 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1128 )
1129
1130 return await self._exec_inspect_command(
1131 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1132 )
1133
1134 async def synchronize_repos(self, cluster_uuid: str):
1135
1136 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1137 try:
1138 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1139 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1140
1141 local_repo_list = await self.repo_list(cluster_uuid)
1142 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1143
1144 deleted_repo_list = []
1145 added_repo_dict = {}
1146
1147 # iterate over the list of repos in the database that should be
1148 # added if not present
1149 for repo_name, db_repo in db_repo_dict.items():
1150 try:
1151 # check if it is already present
1152 curr_repo_url = local_repo_dict.get(db_repo["name"])
1153 repo_id = db_repo.get("_id")
1154 if curr_repo_url != db_repo["url"]:
1155 if curr_repo_url:
1156 self.log.debug(
1157 "repo {} url changed, delete and and again".format(
1158 db_repo["url"]
1159 )
1160 )
1161 await self.repo_remove(cluster_uuid, db_repo["name"])
1162 deleted_repo_list.append(repo_id)
1163
1164 # add repo
1165 self.log.debug("add repo {}".format(db_repo["name"]))
1166 if "ca_cert" in db_repo:
1167 await self.repo_add(
1168 cluster_uuid,
1169 db_repo["name"],
1170 db_repo["url"],
1171 cert=db_repo["ca_cert"],
1172 )
1173 else:
1174 await self.repo_add(
1175 cluster_uuid,
1176 db_repo["name"],
1177 db_repo["url"],
1178 )
1179 added_repo_dict[repo_id] = db_repo["name"]
1180 except Exception as e:
1181 raise K8sException(
1182 "Error adding repo id: {}, err_msg: {} ".format(
1183 repo_id, repr(e)
1184 )
1185 )
1186
1187 # Delete repos that are present but not in nbi_list
1188 for repo_name in local_repo_dict:
1189 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1190 self.log.debug("delete repo {}".format(repo_name))
1191 try:
1192 await self.repo_remove(cluster_uuid, repo_name)
1193 deleted_repo_list.append(repo_name)
1194 except Exception as e:
1195 self.warning(
1196 "Error deleting repo, name: {}, err_msg: {}".format(
1197 repo_name, str(e)
1198 )
1199 )
1200
1201 return deleted_repo_list, added_repo_dict
1202
1203 except K8sException:
1204 raise
1205 except Exception as e:
1206 # Do not raise errors synchronizing repos
1207 self.log.error("Error synchronizing repos: {}".format(e))
1208 raise Exception("Error synchronizing repos: {}".format(e))
1209
1210 def _get_db_repos_dict(self, repo_ids: list):
1211 db_repos_dict = {}
1212 for repo_id in repo_ids:
1213 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1214 db_repos_dict[db_repo["name"]] = db_repo
1215 return db_repos_dict
1216
1217 """
1218 ####################################################################################
1219 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1220 ####################################################################################
1221 """
1222
1223 @abc.abstractmethod
1224 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1225 """
1226 Creates and returns base cluster and kube dirs and returns them.
1227 Also created helm3 dirs according to new directory specification, paths are
1228 not returned but assigned to helm environment variables
1229
1230 :param cluster_name: cluster_name
1231 :return: Dictionary with config_paths and dictionary with helm environment variables
1232 """
1233
1234 @abc.abstractmethod
1235 async def _cluster_init(self, cluster_id, namespace, paths, env):
1236 """
1237 Implements the helm version dependent cluster initialization
1238 """
1239
1240 @abc.abstractmethod
1241 async def _instances_list(self, cluster_id):
1242 """
1243 Implements the helm version dependent helm instances list
1244 """
1245
1246 @abc.abstractmethod
1247 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1248 """
1249 Implements the helm version dependent method to obtain services from a helm instance
1250 """
1251
1252 @abc.abstractmethod
1253 async def _status_kdu(
1254 self,
1255 cluster_id: str,
1256 kdu_instance: str,
1257 namespace: str = None,
1258 yaml_format: bool = False,
1259 show_error_log: bool = False,
1260 ) -> Union[str, dict]:
1261 """
1262 Implements the helm version dependent method to obtain status of a helm instance
1263 """
1264
1265 @abc.abstractmethod
1266 def _get_install_command(
1267 self,
1268 kdu_model,
1269 kdu_instance,
1270 namespace,
1271 params_str,
1272 version,
1273 atomic,
1274 timeout,
1275 kubeconfig,
1276 ) -> str:
1277 """
1278 Obtain command to be executed to delete the indicated instance
1279 """
1280
1281 @abc.abstractmethod
1282 def _get_upgrade_scale_command(
1283 self,
1284 kdu_model,
1285 kdu_instance,
1286 namespace,
1287 count,
1288 version,
1289 atomic,
1290 replicas,
1291 timeout,
1292 resource_name,
1293 kubeconfig,
1294 ) -> str:
1295 """Obtain command to be executed to upgrade the indicated instance."""
1296
1297 @abc.abstractmethod
1298 def _get_upgrade_command(
1299 self,
1300 kdu_model,
1301 kdu_instance,
1302 namespace,
1303 params_str,
1304 version,
1305 atomic,
1306 timeout,
1307 kubeconfig,
1308 ) -> str:
1309 """
1310 Obtain command to be executed to upgrade the indicated instance
1311 """
1312
1313 @abc.abstractmethod
1314 def _get_rollback_command(
1315 self, kdu_instance, namespace, revision, kubeconfig
1316 ) -> str:
1317 """
1318 Obtain command to be executed to rollback the indicated instance
1319 """
1320
1321 @abc.abstractmethod
1322 def _get_uninstall_command(
1323 self, kdu_instance: str, namespace: str, kubeconfig: str
1324 ) -> str:
1325 """
1326 Obtain command to be executed to delete the indicated instance
1327 """
1328
1329 @abc.abstractmethod
1330 def _get_inspect_command(
1331 self, show_command: str, kdu_model: str, repo_str: str, version: str
1332 ):
1333 """
1334 Obtain command to be executed to obtain information about the kdu
1335 """
1336
1337 @abc.abstractmethod
1338 def _get_get_command(
1339 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1340 ):
1341 """Obtain command to be executed to get information about the kdu instance."""
1342
1343 @abc.abstractmethod
1344 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1345 """
1346 Method call to uninstall cluster software for helm. This method is dependent
1347 of helm version
1348 For Helm v2 it will be called when Tiller must be uninstalled
1349 For Helm v3 it does nothing and does not need to be callled
1350 """
1351
1352 @abc.abstractmethod
1353 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1354 """
1355 Obtains the cluster repos identifiers
1356 """
1357
1358 """
1359 ####################################################################################
1360 ################################### P R I V A T E ##################################
1361 ####################################################################################
1362 """
1363
1364 @staticmethod
1365 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1366 if os.path.exists(filename):
1367 return True
1368 else:
1369 msg = "File {} does not exist".format(filename)
1370 if exception_if_not_exists:
1371 raise K8sException(msg)
1372
1373 @staticmethod
1374 def _remove_multiple_spaces(strobj):
1375 strobj = strobj.strip()
1376 while " " in strobj:
1377 strobj = strobj.replace(" ", " ")
1378 return strobj
1379
1380 @staticmethod
1381 def _output_to_lines(output: str) -> list:
1382 output_lines = list()
1383 lines = output.splitlines(keepends=False)
1384 for line in lines:
1385 line = line.strip()
1386 if len(line) > 0:
1387 output_lines.append(line)
1388 return output_lines
1389
1390 @staticmethod
1391 def _output_to_table(output: str) -> list:
1392 output_table = list()
1393 lines = output.splitlines(keepends=False)
1394 for line in lines:
1395 line = line.replace("\t", " ")
1396 line_list = list()
1397 output_table.append(line_list)
1398 cells = line.split(sep=" ")
1399 for cell in cells:
1400 cell = cell.strip()
1401 if len(cell) > 0:
1402 line_list.append(cell)
1403 return output_table
1404
1405 @staticmethod
1406 def _parse_services(output: str) -> list:
1407 lines = output.splitlines(keepends=False)
1408 services = []
1409 for line in lines:
1410 line = line.replace("\t", " ")
1411 cells = line.split(sep=" ")
1412 if len(cells) > 0 and cells[0].startswith("service/"):
1413 elems = cells[0].split(sep="/")
1414 if len(elems) > 1:
1415 services.append(elems[1])
1416 return services
1417
1418 @staticmethod
1419 def _get_deep(dictionary: dict, members: tuple):
1420 target = dictionary
1421 value = None
1422 try:
1423 for m in members:
1424 value = target.get(m)
1425 if not value:
1426 return None
1427 else:
1428 target = value
1429 except Exception:
1430 pass
1431 return value
1432
1433 # find key:value in several lines
1434 @staticmethod
1435 def _find_in_lines(p_lines: list, p_key: str) -> str:
1436 for line in p_lines:
1437 try:
1438 if line.startswith(p_key + ":"):
1439 parts = line.split(":")
1440 the_value = parts[1].strip()
1441 return the_value
1442 except Exception:
1443 # ignore it
1444 pass
1445 return None
1446
1447 @staticmethod
1448 def _lower_keys_list(input_list: list):
1449 """
1450 Transform the keys in a list of dictionaries to lower case and returns a new list
1451 of dictionaries
1452 """
1453 new_list = []
1454 if input_list:
1455 for dictionary in input_list:
1456 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1457 new_list.append(new_dict)
1458 return new_list
1459
1460 async def _local_async_exec(
1461 self,
1462 command: str,
1463 raise_exception_on_error: bool = False,
1464 show_error_log: bool = True,
1465 encode_utf8: bool = False,
1466 env: dict = None,
1467 ) -> (str, int):
1468
1469 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1470 self.log.debug(
1471 "Executing async local command: {}, env: {}".format(command, env)
1472 )
1473
1474 # split command
1475 command = shlex.split(command)
1476
1477 environ = os.environ.copy()
1478 if env:
1479 environ.update(env)
1480
1481 try:
1482 process = await asyncio.create_subprocess_exec(
1483 *command,
1484 stdout=asyncio.subprocess.PIPE,
1485 stderr=asyncio.subprocess.PIPE,
1486 env=environ,
1487 )
1488
1489 # wait for command terminate
1490 stdout, stderr = await process.communicate()
1491
1492 return_code = process.returncode
1493
1494 output = ""
1495 if stdout:
1496 output = stdout.decode("utf-8").strip()
1497 # output = stdout.decode()
1498 if stderr:
1499 output = stderr.decode("utf-8").strip()
1500 # output = stderr.decode()
1501
1502 if return_code != 0 and show_error_log:
1503 self.log.debug(
1504 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1505 )
1506 else:
1507 self.log.debug("Return code: {}".format(return_code))
1508
1509 if raise_exception_on_error and return_code != 0:
1510 raise K8sException(output)
1511
1512 if encode_utf8:
1513 output = output.encode("utf-8").strip()
1514 output = str(output).replace("\\n", "\n")
1515
1516 return output, return_code
1517
1518 except asyncio.CancelledError:
1519 raise
1520 except K8sException:
1521 raise
1522 except Exception as e:
1523 msg = "Exception executing command: {} -> {}".format(command, e)
1524 self.log.error(msg)
1525 if raise_exception_on_error:
1526 raise K8sException(e) from e
1527 else:
1528 return "", -1
1529
1530 async def _local_async_exec_pipe(
1531 self,
1532 command1: str,
1533 command2: str,
1534 raise_exception_on_error: bool = True,
1535 show_error_log: bool = True,
1536 encode_utf8: bool = False,
1537 env: dict = None,
1538 ):
1539
1540 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1541 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1542 command = "{} | {}".format(command1, command2)
1543 self.log.debug(
1544 "Executing async local command: {}, env: {}".format(command, env)
1545 )
1546
1547 # split command
1548 command1 = shlex.split(command1)
1549 command2 = shlex.split(command2)
1550
1551 environ = os.environ.copy()
1552 if env:
1553 environ.update(env)
1554
1555 try:
1556 read, write = os.pipe()
1557 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1558 os.close(write)
1559 process_2 = await asyncio.create_subprocess_exec(
1560 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1561 )
1562 os.close(read)
1563 stdout, stderr = await process_2.communicate()
1564
1565 return_code = process_2.returncode
1566
1567 output = ""
1568 if stdout:
1569 output = stdout.decode("utf-8").strip()
1570 # output = stdout.decode()
1571 if stderr:
1572 output = stderr.decode("utf-8").strip()
1573 # output = stderr.decode()
1574
1575 if return_code != 0 and show_error_log:
1576 self.log.debug(
1577 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1578 )
1579 else:
1580 self.log.debug("Return code: {}".format(return_code))
1581
1582 if raise_exception_on_error and return_code != 0:
1583 raise K8sException(output)
1584
1585 if encode_utf8:
1586 output = output.encode("utf-8").strip()
1587 output = str(output).replace("\\n", "\n")
1588
1589 return output, return_code
1590 except asyncio.CancelledError:
1591 raise
1592 except K8sException:
1593 raise
1594 except Exception as e:
1595 msg = "Exception executing command: {} -> {}".format(command, e)
1596 self.log.error(msg)
1597 if raise_exception_on_error:
1598 raise K8sException(e) from e
1599 else:
1600 return "", -1
1601
1602 async def _get_service(self, cluster_id, service_name, namespace):
1603 """
1604 Obtains the data of the specified service in the k8cluster.
1605
1606 :param cluster_id: id of a K8s cluster known by OSM
1607 :param service_name: name of the K8s service in the specified namespace
1608 :param namespace: K8s namespace used by the KDU instance
1609 :return: If successful, it will return a service with the following data:
1610 - `name` of the service
1611 - `type` type of service in the k8 cluster
1612 - `ports` List of ports offered by the service, for each port includes at least
1613 name, port, protocol
1614 - `cluster_ip` Internal ip to be used inside k8s cluster
1615 - `external_ip` List of external ips (in case they are available)
1616 """
1617
1618 # init config, env
1619 paths, env = self._init_paths_env(
1620 cluster_name=cluster_id, create_if_not_exist=True
1621 )
1622
1623 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1624 self.kubectl_command, paths["kube_config"], namespace, service_name
1625 )
1626
1627 output, _rc = await self._local_async_exec(
1628 command=command, raise_exception_on_error=True, env=env
1629 )
1630
1631 data = yaml.load(output, Loader=yaml.SafeLoader)
1632
1633 service = {
1634 "name": service_name,
1635 "type": self._get_deep(data, ("spec", "type")),
1636 "ports": self._get_deep(data, ("spec", "ports")),
1637 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1638 }
1639 if service["type"] == "LoadBalancer":
1640 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1641 ip_list = [elem["ip"] for elem in ip_map_list]
1642 service["external_ip"] = ip_list
1643
1644 return service
1645
1646 async def _exec_get_command(
1647 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1648 ):
1649 """Obtains information about the kdu instance."""
1650
1651 full_command = self._get_get_command(
1652 get_command, kdu_instance, namespace, kubeconfig
1653 )
1654
1655 output, _rc = await self._local_async_exec(command=full_command)
1656
1657 return output
1658
1659 async def _exec_inspect_command(
1660 self, inspect_command: str, kdu_model: str, repo_url: str = None
1661 ):
1662 """Obtains information about a kdu, no cluster (no env)."""
1663
1664 repo_str = ""
1665 if repo_url:
1666 repo_str = " --repo {}".format(repo_url)
1667
1668 idx = kdu_model.find("/")
1669 if idx >= 0:
1670 idx += 1
1671 kdu_model = kdu_model[idx:]
1672
1673 kdu_model, version = self._split_version(kdu_model)
1674 if version:
1675 version_str = "--version {}".format(version)
1676 else:
1677 version_str = ""
1678
1679 full_command = self._get_inspect_command(
1680 inspect_command, kdu_model, repo_str, version_str
1681 )
1682
1683 output, _rc = await self._local_async_exec(command=full_command)
1684
1685 return output
1686
1687 async def _get_replica_count_url(
1688 self,
1689 kdu_model: str,
1690 repo_url: str,
1691 resource_name: str = None,
1692 ):
1693 """Get the replica count value in the Helm Chart Values.
1694
1695 Args:
1696 kdu_model: The name or path of a bundle
1697 repo_url: Helm Chart repository url
1698 resource_name: Resource name
1699
1700 Returns:
1701 True if replicas, False replicaCount
1702 """
1703
1704 kdu_values = yaml.load(
1705 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1706 )
1707
1708 if not kdu_values:
1709 raise K8sException(
1710 "kdu_values not found for kdu_model {}".format(kdu_model)
1711 )
1712
1713 if resource_name:
1714 kdu_values = kdu_values.get(resource_name, None)
1715
1716 if not kdu_values:
1717 msg = "resource {} not found in the values in model {}".format(
1718 resource_name, kdu_model
1719 )
1720 self.log.error(msg)
1721 raise K8sException(msg)
1722
1723 duplicate_check = False
1724
1725 replica_str = ""
1726 replicas = None
1727
1728 if kdu_values.get("replicaCount", None):
1729 replicas = kdu_values["replicaCount"]
1730 replica_str = "replicaCount"
1731 elif kdu_values.get("replicas", None):
1732 duplicate_check = True
1733 replicas = kdu_values["replicas"]
1734 replica_str = "replicas"
1735 else:
1736 if resource_name:
1737 msg = (
1738 "replicaCount or replicas not found in the resource"
1739 "{} values in model {}. Cannot be scaled".format(
1740 resource_name, kdu_model
1741 )
1742 )
1743 else:
1744 msg = (
1745 "replicaCount or replicas not found in the values"
1746 "in model {}. Cannot be scaled".format(kdu_model)
1747 )
1748 self.log.error(msg)
1749 raise K8sException(msg)
1750
1751 # Control if replicas and replicaCount exists at the same time
1752 msg = "replicaCount and replicas are exists at the same time"
1753 if duplicate_check:
1754 if "replicaCount" in kdu_values:
1755 self.log.error(msg)
1756 raise K8sException(msg)
1757 else:
1758 if "replicas" in kdu_values:
1759 self.log.error(msg)
1760 raise K8sException(msg)
1761
1762 return replicas, replica_str
1763
1764 async def _get_replica_count_instance(
1765 self,
1766 kdu_instance: str,
1767 namespace: str,
1768 kubeconfig: str,
1769 resource_name: str = None,
1770 ):
1771 """Get the replica count value in the instance.
1772
1773 Args:
1774 kdu_instance: The name of the KDU instance
1775 namespace: KDU instance namespace
1776 kubeconfig:
1777 resource_name: Resource name
1778
1779 Returns:
1780 True if replicas, False replicaCount
1781 """
1782
1783 kdu_values = yaml.load(
1784 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1785 Loader=yaml.SafeLoader,
1786 )
1787
1788 replicas = None
1789
1790 if kdu_values:
1791 resource_values = (
1792 kdu_values.get(resource_name, None) if resource_name else None
1793 )
1794 replicas = (
1795 (
1796 resource_values.get("replicaCount", None)
1797 or resource_values.get("replicas", None)
1798 )
1799 if resource_values
1800 else (
1801 kdu_values.get("replicaCount", None)
1802 or kdu_values.get("replicas", None)
1803 )
1804 )
1805
1806 return replicas
1807
1808 async def _store_status(
1809 self,
1810 cluster_id: str,
1811 operation: str,
1812 kdu_instance: str,
1813 namespace: str = None,
1814 check_every: float = 10,
1815 db_dict: dict = None,
1816 run_once: bool = False,
1817 ):
1818 while True:
1819 try:
1820 await asyncio.sleep(check_every)
1821 detailed_status = await self._status_kdu(
1822 cluster_id=cluster_id,
1823 kdu_instance=kdu_instance,
1824 yaml_format=False,
1825 namespace=namespace,
1826 )
1827 status = detailed_status.get("info").get("description")
1828 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1829 # write status to db
1830 result = await self.write_app_status_to_db(
1831 db_dict=db_dict,
1832 status=str(status),
1833 detailed_status=str(detailed_status),
1834 operation=operation,
1835 )
1836 if not result:
1837 self.log.info("Error writing in database. Task exiting...")
1838 return
1839 except asyncio.CancelledError:
1840 self.log.debug("Task cancelled")
1841 return
1842 except Exception as e:
1843 self.log.debug(
1844 "_store_status exception: {}".format(str(e)), exc_info=True
1845 )
1846 pass
1847 finally:
1848 if run_once:
1849 return
1850
1851 # params for use in -f file
1852 # returns values file option and filename (in order to delete it at the end)
1853 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1854
1855 if params and len(params) > 0:
1856 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1857
1858 def get_random_number():
1859 r = random.randrange(start=1, stop=99999999)
1860 s = str(r)
1861 while len(s) < 10:
1862 s = "0" + s
1863 return s
1864
1865 params2 = dict()
1866 for key in params:
1867 value = params.get(key)
1868 if "!!yaml" in str(value):
1869 value = yaml.load(value[7:])
1870 params2[key] = value
1871
1872 values_file = get_random_number() + ".yaml"
1873 with open(values_file, "w") as stream:
1874 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1875
1876 return "-f {}".format(values_file), values_file
1877
1878 return "", None
1879
1880 # params for use in --set option
1881 @staticmethod
1882 def _params_to_set_option(params: dict) -> str:
1883 params_str = ""
1884 if params and len(params) > 0:
1885 start = True
1886 for key in params:
1887 value = params.get(key, None)
1888 if value is not None:
1889 if start:
1890 params_str += "--set "
1891 start = False
1892 else:
1893 params_str += ","
1894 params_str += "{}={}".format(key, value)
1895 return params_str
1896
1897 @staticmethod
1898 def generate_kdu_instance_name(**kwargs):
1899 chart_name = kwargs["kdu_model"]
1900 # check embeded chart (file or dir)
1901 if chart_name.startswith("/"):
1902 # extract file or directory name
1903 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1904 # check URL
1905 elif "://" in chart_name:
1906 # extract last portion of URL
1907 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1908
1909 name = ""
1910 for c in chart_name:
1911 if c.isalpha() or c.isnumeric():
1912 name += c
1913 else:
1914 name += "-"
1915 if len(name) > 35:
1916 name = name[0:35]
1917
1918 # if does not start with alpha character, prefix 'a'
1919 if not name[0].isalpha():
1920 name = "a" + name
1921
1922 name += "-"
1923
1924 def get_random_number():
1925 r = random.randrange(start=1, stop=99999999)
1926 s = str(r)
1927 s = s.rjust(10, "0")
1928 return s
1929
1930 name = name + get_random_number()
1931 return name.lower()
1932
1933 def _split_version(self, kdu_model: str) -> (str, str):
1934 version = None
1935 if ":" in kdu_model:
1936 parts = kdu_model.split(sep=":")
1937 if len(parts) == 2:
1938 version = str(parts[1])
1939 kdu_model = parts[0]
1940 return kdu_model, version
1941
1942 async def _split_repo(self, kdu_model: str) -> str:
1943 repo_name = None
1944 idx = kdu_model.find("/")
1945 if idx >= 0:
1946 repo_name = kdu_model[:idx]
1947 return repo_name
1948
1949 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1950 repo_url = None
1951 idx = kdu_model.find("/")
1952 if idx >= 0:
1953 repo_name = kdu_model[:idx]
1954 # Find repository link
1955 local_repo_list = await self.repo_list(cluster_uuid)
1956 for repo in local_repo_list:
1957 repo_url = repo["url"] if repo["name"] == repo_name else None
1958 return repo_url