ed3ea1c6240e1052eb38ec687eaefdab19459060
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
163 ):
164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid, repo_type, name, url
167 )
168 )
169
170 # init_env
171 paths, env = self._init_paths_env(
172 cluster_name=cluster_uuid, create_if_not_exist=True
173 )
174
175 # sync local dir
176 self.fs.sync(from_path=cluster_uuid)
177
178 # helm repo add name url
179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182
183 if cert:
184 temp_cert_file = os.path.join(
185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
198 self.log.debug("adding repo: {}".format(command))
199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
202
203 # helm repo update
204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_uuid)
214
215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
248
249 # config filename
250 paths, env = self._init_paths_env(
251 cluster_name=cluster_uuid, create_if_not_exist=True
252 )
253
254 # sync local dir
255 self.fs.sync(from_path=cluster_uuid)
256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
267 self.fs.reverse_sync(from_path=cluster_uuid)
268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
283
284 # init env, paths
285 paths, env = self._init_paths_env(
286 cluster_name=cluster_uuid, create_if_not_exist=True
287 )
288
289 # sync local dir
290 self.fs.sync(from_path=cluster_uuid)
291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
297 )
298
299 # sync fs
300 self.fs.reverse_sync(from_path=cluster_uuid)
301
302 async def reset(
303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
308 ) -> bool:
309 """Reset a cluster
310
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid, uninstall_sw
323 )
324 )
325
326 # sync local dir
327 self.fs.sync(from_path=cluster_uuid)
328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid)
357 self.log.warn(msg)
358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
361
362 if uninstall_sw:
363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
364
365 # delete cluster directory
366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
368 # Remove also local directorio if still exist
369 direct = self.fs.path + "/" + cluster_uuid
370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
374 def _is_helm_chart_a_file(self, chart_name: str):
375 return chart_name.count("/") > 1
376
377 async def _install_impl(
378 self,
379 cluster_id: str,
380 kdu_model: str,
381 paths: dict,
382 env: dict,
383 kdu_instance: str,
384 atomic: bool = True,
385 timeout: float = 300,
386 params: dict = None,
387 db_dict: dict = None,
388 kdu_name: str = None,
389 namespace: str = None,
390 ):
391 # init env, paths
392 paths, env = self._init_paths_env(
393 cluster_name=cluster_id, create_if_not_exist=True
394 )
395
396 # params to str
397 params_str, file_to_delete = self._params_to_file_option(
398 cluster_id=cluster_id, params=params
399 )
400
401 # version
402 kdu_model, version = self._split_version(kdu_model)
403
404 repo = self._split_repo(kdu_model)
405 if repo:
406 self.repo_update(cluster_id, repo)
407
408 command = self._get_install_command(
409 kdu_model,
410 kdu_instance,
411 namespace,
412 params_str,
413 version,
414 atomic,
415 timeout,
416 paths["kube_config"],
417 )
418
419 self.log.debug("installing: {}".format(command))
420
421 if atomic:
422 # exec helm in a task
423 exec_task = asyncio.ensure_future(
424 coro_or_future=self._local_async_exec(
425 command=command, raise_exception_on_error=False, env=env
426 )
427 )
428
429 # write status in another task
430 status_task = asyncio.ensure_future(
431 coro_or_future=self._store_status(
432 cluster_id=cluster_id,
433 kdu_instance=kdu_instance,
434 namespace=namespace,
435 db_dict=db_dict,
436 operation="install",
437 run_once=False,
438 )
439 )
440
441 # wait for execution task
442 await asyncio.wait([exec_task])
443
444 # cancel status task
445 status_task.cancel()
446
447 output, rc = exec_task.result()
448
449 else:
450
451 output, rc = await self._local_async_exec(
452 command=command, raise_exception_on_error=False, env=env
453 )
454
455 # remove temporal values yaml file
456 if file_to_delete:
457 os.remove(file_to_delete)
458
459 # write final status
460 await self._store_status(
461 cluster_id=cluster_id,
462 kdu_instance=kdu_instance,
463 namespace=namespace,
464 db_dict=db_dict,
465 operation="install",
466 run_once=True,
467 check_every=0,
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
486
487 # sync local dir
488 self.fs.sync(from_path=cluster_uuid)
489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
497 cluster_name=cluster_uuid, create_if_not_exist=True
498 )
499
500 # sync local dir
501 self.fs.sync(from_path=cluster_uuid)
502
503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
505 cluster_id=cluster_uuid, params=params
506 )
507
508 # version
509 kdu_model, version = self._split_version(kdu_model)
510
511 repo = self._split_repo(kdu_model)
512 if repo:
513 self.repo_update(cluster_uuid, repo)
514
515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
523 paths["kube_config"],
524 )
525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
539 cluster_id=cluster_uuid,
540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
544 run_once=False,
545 )
546 )
547
548 # wait for execution task
549 await asyncio.wait([exec_task])
550
551 # cancel status task
552 status_task.cancel()
553 output, rc = exec_task.result()
554
555 else:
556
557 output, rc = await self._local_async_exec(
558 command=command, raise_exception_on_error=False, env=env
559 )
560
561 # remove temporal values yaml file
562 if file_to_delete:
563 os.remove(file_to_delete)
564
565 # write final status
566 await self._store_status(
567 cluster_id=cluster_uuid,
568 kdu_instance=kdu_instance,
569 namespace=instance_info["namespace"],
570 db_dict=db_dict,
571 operation="upgrade",
572 run_once=True,
573 check_every=0,
574 )
575
576 if rc != 0:
577 msg = "Error executing command: {}\nOutput: {}".format(command, output)
578 self.log.error(msg)
579 raise K8sException(msg)
580
581 # sync fs
582 self.fs.reverse_sync(from_path=cluster_uuid)
583
584 # return new revision number
585 instance = await self.get_instance_info(
586 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
587 )
588 if instance:
589 revision = int(instance.get("revision"))
590 self.log.debug("New revision: {}".format(revision))
591 return revision
592 else:
593 return 0
594
595 async def scale(
596 self,
597 kdu_instance: str,
598 scale: int,
599 resource_name: str,
600 total_timeout: float = 1800,
601 cluster_uuid: str = None,
602 kdu_model: str = None,
603 atomic: bool = True,
604 db_dict: dict = None,
605 **kwargs,
606 ):
607 """Scale a resource in a Helm Chart.
608
609 Args:
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
620
621 Returns:
622 True if successful, False otherwise
623 """
624
625 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
626 if resource_name:
627 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
628 resource_name, kdu_model, cluster_uuid
629 )
630
631 self.log.debug(debug_mgs)
632
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
636 if not instance_info:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance))
638
639 # init env, paths
640 paths, env = self._init_paths_env(
641 cluster_name=cluster_uuid, create_if_not_exist=True
642 )
643
644 # version
645 kdu_model, version = self._split_version(kdu_model)
646
647 repo_url = await self._find_repo(kdu_model, cluster_uuid)
648
649 _, replica_str = await self._get_replica_count_url(
650 kdu_model, repo_url, resource_name
651 )
652
653 command = self._get_upgrade_scale_command(
654 kdu_model,
655 kdu_instance,
656 instance_info["namespace"],
657 scale,
658 version,
659 atomic,
660 replica_str,
661 total_timeout,
662 resource_name,
663 paths["kube_config"],
664 )
665
666 self.log.debug("scaling: {}".format(command))
667
668 if atomic:
669 # exec helm in a task
670 exec_task = asyncio.ensure_future(
671 coro_or_future=self._local_async_exec(
672 command=command, raise_exception_on_error=False, env=env
673 )
674 )
675 # write status in another task
676 status_task = asyncio.ensure_future(
677 coro_or_future=self._store_status(
678 cluster_id=cluster_uuid,
679 kdu_instance=kdu_instance,
680 namespace=instance_info["namespace"],
681 db_dict=db_dict,
682 operation="scale",
683 run_once=False,
684 )
685 )
686
687 # wait for execution task
688 await asyncio.wait([exec_task])
689
690 # cancel status task
691 status_task.cancel()
692 output, rc = exec_task.result()
693
694 else:
695 output, rc = await self._local_async_exec(
696 command=command, raise_exception_on_error=False, env=env
697 )
698
699 # write final status
700 await self._store_status(
701 cluster_id=cluster_uuid,
702 kdu_instance=kdu_instance,
703 namespace=instance_info["namespace"],
704 db_dict=db_dict,
705 operation="scale",
706 run_once=True,
707 check_every=0,
708 )
709
710 if rc != 0:
711 msg = "Error executing command: {}\nOutput: {}".format(command, output)
712 self.log.error(msg)
713 raise K8sException(msg)
714
715 # sync fs
716 self.fs.reverse_sync(from_path=cluster_uuid)
717
718 return True
719
720 async def get_scale_count(
721 self,
722 resource_name: str,
723 kdu_instance: str,
724 cluster_uuid: str,
725 kdu_model: str,
726 **kwargs,
727 ) -> int:
728 """Get a resource scale count.
729
730 Args:
731 cluster_uuid: The UUID of the cluster
732 resource_name: Resource name
733 kdu_instance: KDU instance name
734 kdu_model: The name or path of an Helm Chart
735 kwargs: Additional parameters
736
737 Returns:
738 Resource instance count
739 """
740
741 self.log.debug(
742 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
743 )
744
745 # look for instance to obtain namespace
746 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
747 if not instance_info:
748 raise K8sException("kdu_instance {} not found".format(kdu_instance))
749
750 # init env, paths
751 paths, env = self._init_paths_env(
752 cluster_name=cluster_uuid, create_if_not_exist=True
753 )
754
755 replicas = await self._get_replica_count_instance(
756 kdu_instance, instance_info["namespace"], paths["kube_config"]
757 )
758
759 # Get default value if scale count is not found from provided values
760 if not replicas:
761 repo_url = await self._find_repo(
762 kdu_model=kdu_model, cluster_uuid=cluster_uuid
763 )
764 replicas, _ = await self._get_replica_count_url(
765 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
766 )
767
768 if not replicas:
769 msg = "Replica count not found. Cannot be scaled"
770 self.log.error(msg)
771 raise K8sException(msg)
772
773 return int(replicas)
774
775 async def rollback(
776 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
777 ):
778 self.log.debug(
779 "rollback kdu_instance {} to revision {} from cluster {}".format(
780 kdu_instance, revision, cluster_uuid
781 )
782 )
783
784 # sync local dir
785 self.fs.sync(from_path=cluster_uuid)
786
787 # look for instance to obtain namespace
788 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
789 if not instance_info:
790 raise K8sException("kdu_instance {} not found".format(kdu_instance))
791
792 # init env, paths
793 paths, env = self._init_paths_env(
794 cluster_name=cluster_uuid, create_if_not_exist=True
795 )
796
797 # sync local dir
798 self.fs.sync(from_path=cluster_uuid)
799
800 command = self._get_rollback_command(
801 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
802 )
803
804 self.log.debug("rolling_back: {}".format(command))
805
806 # exec helm in a task
807 exec_task = asyncio.ensure_future(
808 coro_or_future=self._local_async_exec(
809 command=command, raise_exception_on_error=False, env=env
810 )
811 )
812 # write status in another task
813 status_task = asyncio.ensure_future(
814 coro_or_future=self._store_status(
815 cluster_id=cluster_uuid,
816 kdu_instance=kdu_instance,
817 namespace=instance_info["namespace"],
818 db_dict=db_dict,
819 operation="rollback",
820 run_once=False,
821 )
822 )
823
824 # wait for execution task
825 await asyncio.wait([exec_task])
826
827 # cancel status task
828 status_task.cancel()
829
830 output, rc = exec_task.result()
831
832 # write final status
833 await self._store_status(
834 cluster_id=cluster_uuid,
835 kdu_instance=kdu_instance,
836 namespace=instance_info["namespace"],
837 db_dict=db_dict,
838 operation="rollback",
839 run_once=True,
840 check_every=0,
841 )
842
843 if rc != 0:
844 msg = "Error executing command: {}\nOutput: {}".format(command, output)
845 self.log.error(msg)
846 raise K8sException(msg)
847
848 # sync fs
849 self.fs.reverse_sync(from_path=cluster_uuid)
850
851 # return new revision number
852 instance = await self.get_instance_info(
853 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
854 )
855 if instance:
856 revision = int(instance.get("revision"))
857 self.log.debug("New revision: {}".format(revision))
858 return revision
859 else:
860 return 0
861
862 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
863 """
864 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
865 (this call should happen after all _terminate-config-primitive_ of the VNF
866 are invoked).
867
868 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
869 :param kdu_instance: unique name for the KDU instance to be deleted
870 :param kwargs: Additional parameters (None yet)
871 :return: True if successful
872 """
873
874 self.log.debug(
875 "uninstall kdu_instance {} from cluster {}".format(
876 kdu_instance, cluster_uuid
877 )
878 )
879
880 # sync local dir
881 self.fs.sync(from_path=cluster_uuid)
882
883 # look for instance to obtain namespace
884 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
885 if not instance_info:
886 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
887 return True
888 # init env, paths
889 paths, env = self._init_paths_env(
890 cluster_name=cluster_uuid, create_if_not_exist=True
891 )
892
893 # sync local dir
894 self.fs.sync(from_path=cluster_uuid)
895
896 command = self._get_uninstall_command(
897 kdu_instance, instance_info["namespace"], paths["kube_config"]
898 )
899 output, _rc = await self._local_async_exec(
900 command=command, raise_exception_on_error=True, env=env
901 )
902
903 # sync fs
904 self.fs.reverse_sync(from_path=cluster_uuid)
905
906 return self._output_to_table(output)
907
908 async def instances_list(self, cluster_uuid: str) -> list:
909 """
910 returns a list of deployed releases in a cluster
911
912 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
913 :return:
914 """
915
916 self.log.debug("list releases for cluster {}".format(cluster_uuid))
917
918 # sync local dir
919 self.fs.sync(from_path=cluster_uuid)
920
921 # execute internal command
922 result = await self._instances_list(cluster_uuid)
923
924 # sync fs
925 self.fs.reverse_sync(from_path=cluster_uuid)
926
927 return result
928
929 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
930 instances = await self.instances_list(cluster_uuid=cluster_uuid)
931 for instance in instances:
932 if instance.get("name") == kdu_instance:
933 return instance
934 self.log.debug("Instance {} not found".format(kdu_instance))
935 return None
936
937 async def upgrade_charm(
938 self,
939 ee_id: str = None,
940 path: str = None,
941 charm_id: str = None,
942 charm_type: str = None,
943 timeout: float = None,
944 ) -> str:
945 """This method upgrade charms in VNFs
946
947 Args:
948 ee_id: Execution environment id
949 path: Local path to the charm
950 charm_id: charm-id
951 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
952 timeout: (Float) Timeout for the ns update operation
953
954 Returns:
955 The output of the update operation if status equals to "completed"
956 """
957 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
958
959 async def exec_primitive(
960 self,
961 cluster_uuid: str = None,
962 kdu_instance: str = None,
963 primitive_name: str = None,
964 timeout: float = 300,
965 params: dict = None,
966 db_dict: dict = None,
967 **kwargs,
968 ) -> str:
969 """Exec primitive (Juju action)
970
971 :param cluster_uuid: The UUID of the cluster or namespace:cluster
972 :param kdu_instance: The unique name of the KDU instance
973 :param primitive_name: Name of action that will be executed
974 :param timeout: Timeout for action execution
975 :param params: Dictionary of all the parameters needed for the action
976 :db_dict: Dictionary for any additional data
977 :param kwargs: Additional parameters (None yet)
978
979 :return: Returns the output of the action
980 """
981 raise K8sException(
982 "KDUs deployed with Helm don't support actions "
983 "different from rollback, upgrade and status"
984 )
985
986 async def get_services(
987 self, cluster_uuid: str, kdu_instance: str, namespace: str
988 ) -> list:
989 """
990 Returns a list of services defined for the specified kdu instance.
991
992 :param cluster_uuid: UUID of a K8s cluster known by OSM
993 :param kdu_instance: unique name for the KDU instance
994 :param namespace: K8s namespace used by the KDU instance
995 :return: If successful, it will return a list of services, Each service
996 can have the following data:
997 - `name` of the service
998 - `type` type of service in the k8 cluster
999 - `ports` List of ports offered by the service, for each port includes at least
1000 name, port, protocol
1001 - `cluster_ip` Internal ip to be used inside k8s cluster
1002 - `external_ip` List of external ips (in case they are available)
1003 """
1004
1005 self.log.debug(
1006 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1007 cluster_uuid, kdu_instance
1008 )
1009 )
1010
1011 # init env, paths
1012 paths, env = self._init_paths_env(
1013 cluster_name=cluster_uuid, create_if_not_exist=True
1014 )
1015
1016 # sync local dir
1017 self.fs.sync(from_path=cluster_uuid)
1018
1019 # get list of services names for kdu
1020 service_names = await self._get_services(
1021 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1022 )
1023
1024 service_list = []
1025 for service in service_names:
1026 service = await self._get_service(cluster_uuid, service, namespace)
1027 service_list.append(service)
1028
1029 # sync fs
1030 self.fs.reverse_sync(from_path=cluster_uuid)
1031
1032 return service_list
1033
1034 async def get_service(
1035 self, cluster_uuid: str, service_name: str, namespace: str
1036 ) -> object:
1037
1038 self.log.debug(
1039 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1040 service_name, namespace, cluster_uuid
1041 )
1042 )
1043
1044 # sync local dir
1045 self.fs.sync(from_path=cluster_uuid)
1046
1047 service = await self._get_service(cluster_uuid, service_name, namespace)
1048
1049 # sync fs
1050 self.fs.reverse_sync(from_path=cluster_uuid)
1051
1052 return service
1053
1054 async def status_kdu(
1055 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1056 ) -> Union[str, dict]:
1057 """
1058 This call would retrieve tha current state of a given KDU instance. It would be
1059 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1060 values_ of the configuration parameters applied to a given instance. This call
1061 would be based on the `status` call.
1062
1063 :param cluster_uuid: UUID of a K8s cluster known by OSM
1064 :param kdu_instance: unique name for the KDU instance
1065 :param kwargs: Additional parameters (None yet)
1066 :param yaml_format: if the return shall be returned as an YAML string or as a
1067 dictionary
1068 :return: If successful, it will return the following vector of arguments:
1069 - K8s `namespace` in the cluster where the KDU lives
1070 - `state` of the KDU instance. It can be:
1071 - UNKNOWN
1072 - DEPLOYED
1073 - DELETED
1074 - SUPERSEDED
1075 - FAILED or
1076 - DELETING
1077 - List of `resources` (objects) that this release consists of, sorted by kind,
1078 and the status of those resources
1079 - Last `deployment_time`.
1080
1081 """
1082 self.log.debug(
1083 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1084 cluster_uuid, kdu_instance
1085 )
1086 )
1087
1088 # sync local dir
1089 self.fs.sync(from_path=cluster_uuid)
1090
1091 # get instance: needed to obtain namespace
1092 instances = await self._instances_list(cluster_id=cluster_uuid)
1093 for instance in instances:
1094 if instance.get("name") == kdu_instance:
1095 break
1096 else:
1097 # instance does not exist
1098 raise K8sException(
1099 "Instance name: {} not found in cluster: {}".format(
1100 kdu_instance, cluster_uuid
1101 )
1102 )
1103
1104 status = await self._status_kdu(
1105 cluster_id=cluster_uuid,
1106 kdu_instance=kdu_instance,
1107 namespace=instance["namespace"],
1108 yaml_format=yaml_format,
1109 show_error_log=True,
1110 )
1111
1112 # sync fs
1113 self.fs.reverse_sync(from_path=cluster_uuid)
1114
1115 return status
1116
1117 async def get_values_kdu(
1118 self, kdu_instance: str, namespace: str, kubeconfig: str
1119 ) -> str:
1120
1121 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1122
1123 return await self._exec_get_command(
1124 get_command="values",
1125 kdu_instance=kdu_instance,
1126 namespace=namespace,
1127 kubeconfig=kubeconfig,
1128 )
1129
1130 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1131 """Method to obtain the Helm Chart package's values
1132
1133 Args:
1134 kdu_model: The name or path of an Helm Chart
1135 repo_url: Helm Chart repository url
1136
1137 Returns:
1138 str: the values of the Helm Chart package
1139 """
1140
1141 self.log.debug(
1142 "inspect kdu_model values {} from (optional) repo: {}".format(
1143 kdu_model, repo_url
1144 )
1145 )
1146
1147 return await self._exec_inspect_command(
1148 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1149 )
1150
1151 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1152
1153 self.log.debug(
1154 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1155 )
1156
1157 return await self._exec_inspect_command(
1158 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1159 )
1160
1161 async def synchronize_repos(self, cluster_uuid: str):
1162
1163 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1164 try:
1165 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1166 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1167
1168 local_repo_list = await self.repo_list(cluster_uuid)
1169 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1170
1171 deleted_repo_list = []
1172 added_repo_dict = {}
1173
1174 # iterate over the list of repos in the database that should be
1175 # added if not present
1176 for repo_name, db_repo in db_repo_dict.items():
1177 try:
1178 # check if it is already present
1179 curr_repo_url = local_repo_dict.get(db_repo["name"])
1180 repo_id = db_repo.get("_id")
1181 if curr_repo_url != db_repo["url"]:
1182 if curr_repo_url:
1183 self.log.debug(
1184 "repo {} url changed, delete and and again".format(
1185 db_repo["url"]
1186 )
1187 )
1188 await self.repo_remove(cluster_uuid, db_repo["name"])
1189 deleted_repo_list.append(repo_id)
1190
1191 # add repo
1192 self.log.debug("add repo {}".format(db_repo["name"]))
1193 if "ca_cert" in db_repo:
1194 await self.repo_add(
1195 cluster_uuid,
1196 db_repo["name"],
1197 db_repo["url"],
1198 cert=db_repo["ca_cert"],
1199 )
1200 else:
1201 await self.repo_add(
1202 cluster_uuid,
1203 db_repo["name"],
1204 db_repo["url"],
1205 )
1206 added_repo_dict[repo_id] = db_repo["name"]
1207 except Exception as e:
1208 raise K8sException(
1209 "Error adding repo id: {}, err_msg: {} ".format(
1210 repo_id, repr(e)
1211 )
1212 )
1213
1214 # Delete repos that are present but not in nbi_list
1215 for repo_name in local_repo_dict:
1216 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1217 self.log.debug("delete repo {}".format(repo_name))
1218 try:
1219 await self.repo_remove(cluster_uuid, repo_name)
1220 deleted_repo_list.append(repo_name)
1221 except Exception as e:
1222 self.warning(
1223 "Error deleting repo, name: {}, err_msg: {}".format(
1224 repo_name, str(e)
1225 )
1226 )
1227
1228 return deleted_repo_list, added_repo_dict
1229
1230 except K8sException:
1231 raise
1232 except Exception as e:
1233 # Do not raise errors synchronizing repos
1234 self.log.error("Error synchronizing repos: {}".format(e))
1235 raise Exception("Error synchronizing repos: {}".format(e))
1236
1237 def _get_db_repos_dict(self, repo_ids: list):
1238 db_repos_dict = {}
1239 for repo_id in repo_ids:
1240 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1241 db_repos_dict[db_repo["name"]] = db_repo
1242 return db_repos_dict
1243
1244 """
1245 ####################################################################################
1246 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1247 ####################################################################################
1248 """
1249
1250 @abc.abstractmethod
1251 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1252 """
1253 Creates and returns base cluster and kube dirs and returns them.
1254 Also created helm3 dirs according to new directory specification, paths are
1255 not returned but assigned to helm environment variables
1256
1257 :param cluster_name: cluster_name
1258 :return: Dictionary with config_paths and dictionary with helm environment variables
1259 """
1260
1261 @abc.abstractmethod
1262 async def _cluster_init(self, cluster_id, namespace, paths, env):
1263 """
1264 Implements the helm version dependent cluster initialization
1265 """
1266
1267 @abc.abstractmethod
1268 async def _instances_list(self, cluster_id):
1269 """
1270 Implements the helm version dependent helm instances list
1271 """
1272
1273 @abc.abstractmethod
1274 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1275 """
1276 Implements the helm version dependent method to obtain services from a helm instance
1277 """
1278
1279 @abc.abstractmethod
1280 async def _status_kdu(
1281 self,
1282 cluster_id: str,
1283 kdu_instance: str,
1284 namespace: str = None,
1285 yaml_format: bool = False,
1286 show_error_log: bool = False,
1287 ) -> Union[str, dict]:
1288 """
1289 Implements the helm version dependent method to obtain status of a helm instance
1290 """
1291
1292 @abc.abstractmethod
1293 def _get_install_command(
1294 self,
1295 kdu_model,
1296 kdu_instance,
1297 namespace,
1298 params_str,
1299 version,
1300 atomic,
1301 timeout,
1302 kubeconfig,
1303 ) -> str:
1304 """
1305 Obtain command to be executed to delete the indicated instance
1306 """
1307
1308 @abc.abstractmethod
1309 def _get_upgrade_scale_command(
1310 self,
1311 kdu_model,
1312 kdu_instance,
1313 namespace,
1314 count,
1315 version,
1316 atomic,
1317 replicas,
1318 timeout,
1319 resource_name,
1320 kubeconfig,
1321 ) -> str:
1322 """Obtain command to be executed to upgrade the indicated instance."""
1323
1324 @abc.abstractmethod
1325 def _get_upgrade_command(
1326 self,
1327 kdu_model,
1328 kdu_instance,
1329 namespace,
1330 params_str,
1331 version,
1332 atomic,
1333 timeout,
1334 kubeconfig,
1335 ) -> str:
1336 """
1337 Obtain command to be executed to upgrade the indicated instance
1338 """
1339
1340 @abc.abstractmethod
1341 def _get_rollback_command(
1342 self, kdu_instance, namespace, revision, kubeconfig
1343 ) -> str:
1344 """
1345 Obtain command to be executed to rollback the indicated instance
1346 """
1347
1348 @abc.abstractmethod
1349 def _get_uninstall_command(
1350 self, kdu_instance: str, namespace: str, kubeconfig: str
1351 ) -> str:
1352 """
1353 Obtain command to be executed to delete the indicated instance
1354 """
1355
1356 @abc.abstractmethod
1357 def _get_inspect_command(
1358 self, show_command: str, kdu_model: str, repo_str: str, version: str
1359 ):
1360 """Generates the command to obtain the information about an Helm Chart package
1361 (´helm show ...´ command)
1362
1363 Args:
1364 show_command: the second part of the command (`helm show <show_command>`)
1365 kdu_model: The name or path of an Helm Chart
1366 repo_url: Helm Chart repository url
1367 version: constraint with specific version of the Chart to use
1368
1369 Returns:
1370 str: the generated Helm Chart command
1371 """
1372
1373 @abc.abstractmethod
1374 def _get_get_command(
1375 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1376 ):
1377 """Obtain command to be executed to get information about the kdu instance."""
1378
1379 @abc.abstractmethod
1380 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1381 """
1382 Method call to uninstall cluster software for helm. This method is dependent
1383 of helm version
1384 For Helm v2 it will be called when Tiller must be uninstalled
1385 For Helm v3 it does nothing and does not need to be callled
1386 """
1387
1388 @abc.abstractmethod
1389 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1390 """
1391 Obtains the cluster repos identifiers
1392 """
1393
1394 """
1395 ####################################################################################
1396 ################################### P R I V A T E ##################################
1397 ####################################################################################
1398 """
1399
1400 @staticmethod
1401 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1402 if os.path.exists(filename):
1403 return True
1404 else:
1405 msg = "File {} does not exist".format(filename)
1406 if exception_if_not_exists:
1407 raise K8sException(msg)
1408
1409 @staticmethod
1410 def _remove_multiple_spaces(strobj):
1411 strobj = strobj.strip()
1412 while " " in strobj:
1413 strobj = strobj.replace(" ", " ")
1414 return strobj
1415
1416 @staticmethod
1417 def _output_to_lines(output: str) -> list:
1418 output_lines = list()
1419 lines = output.splitlines(keepends=False)
1420 for line in lines:
1421 line = line.strip()
1422 if len(line) > 0:
1423 output_lines.append(line)
1424 return output_lines
1425
1426 @staticmethod
1427 def _output_to_table(output: str) -> list:
1428 output_table = list()
1429 lines = output.splitlines(keepends=False)
1430 for line in lines:
1431 line = line.replace("\t", " ")
1432 line_list = list()
1433 output_table.append(line_list)
1434 cells = line.split(sep=" ")
1435 for cell in cells:
1436 cell = cell.strip()
1437 if len(cell) > 0:
1438 line_list.append(cell)
1439 return output_table
1440
1441 @staticmethod
1442 def _parse_services(output: str) -> list:
1443 lines = output.splitlines(keepends=False)
1444 services = []
1445 for line in lines:
1446 line = line.replace("\t", " ")
1447 cells = line.split(sep=" ")
1448 if len(cells) > 0 and cells[0].startswith("service/"):
1449 elems = cells[0].split(sep="/")
1450 if len(elems) > 1:
1451 services.append(elems[1])
1452 return services
1453
1454 @staticmethod
1455 def _get_deep(dictionary: dict, members: tuple):
1456 target = dictionary
1457 value = None
1458 try:
1459 for m in members:
1460 value = target.get(m)
1461 if not value:
1462 return None
1463 else:
1464 target = value
1465 except Exception:
1466 pass
1467 return value
1468
1469 # find key:value in several lines
1470 @staticmethod
1471 def _find_in_lines(p_lines: list, p_key: str) -> str:
1472 for line in p_lines:
1473 try:
1474 if line.startswith(p_key + ":"):
1475 parts = line.split(":")
1476 the_value = parts[1].strip()
1477 return the_value
1478 except Exception:
1479 # ignore it
1480 pass
1481 return None
1482
1483 @staticmethod
1484 def _lower_keys_list(input_list: list):
1485 """
1486 Transform the keys in a list of dictionaries to lower case and returns a new list
1487 of dictionaries
1488 """
1489 new_list = []
1490 if input_list:
1491 for dictionary in input_list:
1492 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1493 new_list.append(new_dict)
1494 return new_list
1495
1496 async def _local_async_exec(
1497 self,
1498 command: str,
1499 raise_exception_on_error: bool = False,
1500 show_error_log: bool = True,
1501 encode_utf8: bool = False,
1502 env: dict = None,
1503 ) -> (str, int):
1504
1505 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1506 self.log.debug(
1507 "Executing async local command: {}, env: {}".format(command, env)
1508 )
1509
1510 # split command
1511 command = shlex.split(command)
1512
1513 environ = os.environ.copy()
1514 if env:
1515 environ.update(env)
1516
1517 try:
1518 process = await asyncio.create_subprocess_exec(
1519 *command,
1520 stdout=asyncio.subprocess.PIPE,
1521 stderr=asyncio.subprocess.PIPE,
1522 env=environ,
1523 )
1524
1525 # wait for command terminate
1526 stdout, stderr = await process.communicate()
1527
1528 return_code = process.returncode
1529
1530 output = ""
1531 if stdout:
1532 output = stdout.decode("utf-8").strip()
1533 # output = stdout.decode()
1534 if stderr:
1535 output = stderr.decode("utf-8").strip()
1536 # output = stderr.decode()
1537
1538 if return_code != 0 and show_error_log:
1539 self.log.debug(
1540 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1541 )
1542 else:
1543 self.log.debug("Return code: {}".format(return_code))
1544
1545 if raise_exception_on_error and return_code != 0:
1546 raise K8sException(output)
1547
1548 if encode_utf8:
1549 output = output.encode("utf-8").strip()
1550 output = str(output).replace("\\n", "\n")
1551
1552 return output, return_code
1553
1554 except asyncio.CancelledError:
1555 raise
1556 except K8sException:
1557 raise
1558 except Exception as e:
1559 msg = "Exception executing command: {} -> {}".format(command, e)
1560 self.log.error(msg)
1561 if raise_exception_on_error:
1562 raise K8sException(e) from e
1563 else:
1564 return "", -1
1565
1566 async def _local_async_exec_pipe(
1567 self,
1568 command1: str,
1569 command2: str,
1570 raise_exception_on_error: bool = True,
1571 show_error_log: bool = True,
1572 encode_utf8: bool = False,
1573 env: dict = None,
1574 ):
1575
1576 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1577 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1578 command = "{} | {}".format(command1, command2)
1579 self.log.debug(
1580 "Executing async local command: {}, env: {}".format(command, env)
1581 )
1582
1583 # split command
1584 command1 = shlex.split(command1)
1585 command2 = shlex.split(command2)
1586
1587 environ = os.environ.copy()
1588 if env:
1589 environ.update(env)
1590
1591 try:
1592 read, write = os.pipe()
1593 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1594 os.close(write)
1595 process_2 = await asyncio.create_subprocess_exec(
1596 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1597 )
1598 os.close(read)
1599 stdout, stderr = await process_2.communicate()
1600
1601 return_code = process_2.returncode
1602
1603 output = ""
1604 if stdout:
1605 output = stdout.decode("utf-8").strip()
1606 # output = stdout.decode()
1607 if stderr:
1608 output = stderr.decode("utf-8").strip()
1609 # output = stderr.decode()
1610
1611 if return_code != 0 and show_error_log:
1612 self.log.debug(
1613 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1614 )
1615 else:
1616 self.log.debug("Return code: {}".format(return_code))
1617
1618 if raise_exception_on_error and return_code != 0:
1619 raise K8sException(output)
1620
1621 if encode_utf8:
1622 output = output.encode("utf-8").strip()
1623 output = str(output).replace("\\n", "\n")
1624
1625 return output, return_code
1626 except asyncio.CancelledError:
1627 raise
1628 except K8sException:
1629 raise
1630 except Exception as e:
1631 msg = "Exception executing command: {} -> {}".format(command, e)
1632 self.log.error(msg)
1633 if raise_exception_on_error:
1634 raise K8sException(e) from e
1635 else:
1636 return "", -1
1637
1638 async def _get_service(self, cluster_id, service_name, namespace):
1639 """
1640 Obtains the data of the specified service in the k8cluster.
1641
1642 :param cluster_id: id of a K8s cluster known by OSM
1643 :param service_name: name of the K8s service in the specified namespace
1644 :param namespace: K8s namespace used by the KDU instance
1645 :return: If successful, it will return a service with the following data:
1646 - `name` of the service
1647 - `type` type of service in the k8 cluster
1648 - `ports` List of ports offered by the service, for each port includes at least
1649 name, port, protocol
1650 - `cluster_ip` Internal ip to be used inside k8s cluster
1651 - `external_ip` List of external ips (in case they are available)
1652 """
1653
1654 # init config, env
1655 paths, env = self._init_paths_env(
1656 cluster_name=cluster_id, create_if_not_exist=True
1657 )
1658
1659 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1660 self.kubectl_command, paths["kube_config"], namespace, service_name
1661 )
1662
1663 output, _rc = await self._local_async_exec(
1664 command=command, raise_exception_on_error=True, env=env
1665 )
1666
1667 data = yaml.load(output, Loader=yaml.SafeLoader)
1668
1669 service = {
1670 "name": service_name,
1671 "type": self._get_deep(data, ("spec", "type")),
1672 "ports": self._get_deep(data, ("spec", "ports")),
1673 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1674 }
1675 if service["type"] == "LoadBalancer":
1676 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1677 ip_list = [elem["ip"] for elem in ip_map_list]
1678 service["external_ip"] = ip_list
1679
1680 return service
1681
1682 async def _exec_get_command(
1683 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1684 ):
1685 """Obtains information about the kdu instance."""
1686
1687 full_command = self._get_get_command(
1688 get_command, kdu_instance, namespace, kubeconfig
1689 )
1690
1691 output, _rc = await self._local_async_exec(command=full_command)
1692
1693 return output
1694
1695 async def _exec_inspect_command(
1696 self, inspect_command: str, kdu_model: str, repo_url: str = None
1697 ):
1698 """Obtains information about an Helm Chart package (´helm show´ command)
1699
1700 Args:
1701 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1702 kdu_model: The name or path of an Helm Chart
1703 repo_url: Helm Chart repository url
1704
1705 Returns:
1706 str: the requested info about the Helm Chart package
1707 """
1708
1709 repo_str = ""
1710 if repo_url:
1711 repo_str = " --repo {}".format(repo_url)
1712
1713 idx = kdu_model.find("/")
1714 if idx >= 0:
1715 idx += 1
1716 kdu_model = kdu_model[idx:]
1717
1718 kdu_model, version = self._split_version(kdu_model)
1719 if version:
1720 version_str = "--version {}".format(version)
1721 else:
1722 version_str = ""
1723
1724 full_command = self._get_inspect_command(
1725 inspect_command, kdu_model, repo_str, version_str
1726 )
1727
1728 output, _rc = await self._local_async_exec(command=full_command)
1729
1730 return output
1731
1732 async def _get_replica_count_url(
1733 self,
1734 kdu_model: str,
1735 repo_url: str = None,
1736 resource_name: str = None,
1737 ):
1738 """Get the replica count value in the Helm Chart Values.
1739
1740 Args:
1741 kdu_model: The name or path of an Helm Chart
1742 repo_url: Helm Chart repository url
1743 resource_name: Resource name
1744
1745 Returns:
1746 True if replicas, False replicaCount
1747 """
1748
1749 kdu_values = yaml.load(
1750 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1751 Loader=yaml.SafeLoader,
1752 )
1753
1754 if not kdu_values:
1755 raise K8sException(
1756 "kdu_values not found for kdu_model {}".format(kdu_model)
1757 )
1758
1759 if resource_name:
1760 kdu_values = kdu_values.get(resource_name, None)
1761
1762 if not kdu_values:
1763 msg = "resource {} not found in the values in model {}".format(
1764 resource_name, kdu_model
1765 )
1766 self.log.error(msg)
1767 raise K8sException(msg)
1768
1769 duplicate_check = False
1770
1771 replica_str = ""
1772 replicas = None
1773
1774 if kdu_values.get("replicaCount", None):
1775 replicas = kdu_values["replicaCount"]
1776 replica_str = "replicaCount"
1777 elif kdu_values.get("replicas", None):
1778 duplicate_check = True
1779 replicas = kdu_values["replicas"]
1780 replica_str = "replicas"
1781 else:
1782 if resource_name:
1783 msg = (
1784 "replicaCount or replicas not found in the resource"
1785 "{} values in model {}. Cannot be scaled".format(
1786 resource_name, kdu_model
1787 )
1788 )
1789 else:
1790 msg = (
1791 "replicaCount or replicas not found in the values"
1792 "in model {}. Cannot be scaled".format(kdu_model)
1793 )
1794 self.log.error(msg)
1795 raise K8sException(msg)
1796
1797 # Control if replicas and replicaCount exists at the same time
1798 msg = "replicaCount and replicas are exists at the same time"
1799 if duplicate_check:
1800 if "replicaCount" in kdu_values:
1801 self.log.error(msg)
1802 raise K8sException(msg)
1803 else:
1804 if "replicas" in kdu_values:
1805 self.log.error(msg)
1806 raise K8sException(msg)
1807
1808 return replicas, replica_str
1809
1810 async def _get_replica_count_instance(
1811 self,
1812 kdu_instance: str,
1813 namespace: str,
1814 kubeconfig: str,
1815 resource_name: str = None,
1816 ):
1817 """Get the replica count value in the instance.
1818
1819 Args:
1820 kdu_instance: The name of the KDU instance
1821 namespace: KDU instance namespace
1822 kubeconfig:
1823 resource_name: Resource name
1824
1825 Returns:
1826 True if replicas, False replicaCount
1827 """
1828
1829 kdu_values = yaml.load(
1830 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1831 Loader=yaml.SafeLoader,
1832 )
1833
1834 replicas = None
1835
1836 if kdu_values:
1837 resource_values = (
1838 kdu_values.get(resource_name, None) if resource_name else None
1839 )
1840 replicas = (
1841 (
1842 resource_values.get("replicaCount", None)
1843 or resource_values.get("replicas", None)
1844 )
1845 if resource_values
1846 else (
1847 kdu_values.get("replicaCount", None)
1848 or kdu_values.get("replicas", None)
1849 )
1850 )
1851
1852 return replicas
1853
1854 async def _store_status(
1855 self,
1856 cluster_id: str,
1857 operation: str,
1858 kdu_instance: str,
1859 namespace: str = None,
1860 check_every: float = 10,
1861 db_dict: dict = None,
1862 run_once: bool = False,
1863 ):
1864 while True:
1865 try:
1866 await asyncio.sleep(check_every)
1867 detailed_status = await self._status_kdu(
1868 cluster_id=cluster_id,
1869 kdu_instance=kdu_instance,
1870 yaml_format=False,
1871 namespace=namespace,
1872 )
1873 status = detailed_status.get("info").get("description")
1874 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1875 # write status to db
1876 result = await self.write_app_status_to_db(
1877 db_dict=db_dict,
1878 status=str(status),
1879 detailed_status=str(detailed_status),
1880 operation=operation,
1881 )
1882 if not result:
1883 self.log.info("Error writing in database. Task exiting...")
1884 return
1885 except asyncio.CancelledError:
1886 self.log.debug("Task cancelled")
1887 return
1888 except Exception as e:
1889 self.log.debug(
1890 "_store_status exception: {}".format(str(e)), exc_info=True
1891 )
1892 pass
1893 finally:
1894 if run_once:
1895 return
1896
1897 # params for use in -f file
1898 # returns values file option and filename (in order to delete it at the end)
1899 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1900
1901 if params and len(params) > 0:
1902 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1903
1904 def get_random_number():
1905 r = random.randrange(start=1, stop=99999999)
1906 s = str(r)
1907 while len(s) < 10:
1908 s = "0" + s
1909 return s
1910
1911 params2 = dict()
1912 for key in params:
1913 value = params.get(key)
1914 if "!!yaml" in str(value):
1915 value = yaml.safe_load(value[7:])
1916 params2[key] = value
1917
1918 values_file = get_random_number() + ".yaml"
1919 with open(values_file, "w") as stream:
1920 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1921
1922 return "-f {}".format(values_file), values_file
1923
1924 return "", None
1925
1926 # params for use in --set option
1927 @staticmethod
1928 def _params_to_set_option(params: dict) -> str:
1929 params_str = ""
1930 if params and len(params) > 0:
1931 start = True
1932 for key in params:
1933 value = params.get(key, None)
1934 if value is not None:
1935 if start:
1936 params_str += "--set "
1937 start = False
1938 else:
1939 params_str += ","
1940 params_str += "{}={}".format(key, value)
1941 return params_str
1942
1943 @staticmethod
1944 def generate_kdu_instance_name(**kwargs):
1945 chart_name = kwargs["kdu_model"]
1946 # check embeded chart (file or dir)
1947 if chart_name.startswith("/"):
1948 # extract file or directory name
1949 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1950 # check URL
1951 elif "://" in chart_name:
1952 # extract last portion of URL
1953 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1954
1955 name = ""
1956 for c in chart_name:
1957 if c.isalpha() or c.isnumeric():
1958 name += c
1959 else:
1960 name += "-"
1961 if len(name) > 35:
1962 name = name[0:35]
1963
1964 # if does not start with alpha character, prefix 'a'
1965 if not name[0].isalpha():
1966 name = "a" + name
1967
1968 name += "-"
1969
1970 def get_random_number():
1971 r = random.randrange(start=1, stop=99999999)
1972 s = str(r)
1973 s = s.rjust(10, "0")
1974 return s
1975
1976 name = name + get_random_number()
1977 return name.lower()
1978
1979 def _split_version(self, kdu_model: str) -> (str, str):
1980 version = None
1981 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
1982 parts = kdu_model.split(sep=":")
1983 if len(parts) == 2:
1984 version = str(parts[1])
1985 kdu_model = parts[0]
1986 return kdu_model, version
1987
1988 async def _split_repo(self, kdu_model: str) -> str:
1989 repo_name = None
1990 idx = kdu_model.find("/")
1991 if idx >= 0:
1992 repo_name = kdu_model[:idx]
1993 return repo_name
1994
1995 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1996 """Obtain the Helm repository for an Helm Chart
1997
1998 Args:
1999 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2000 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2001
2002 Returns:
2003 str: the repository URL; if Helm Chart is a local one, the function returns None
2004 """
2005
2006 repo_url = None
2007 idx = kdu_model.find("/")
2008 if idx >= 0:
2009 repo_name = kdu_model[:idx]
2010 # Find repository link
2011 local_repo_list = await self.repo_list(cluster_uuid)
2012 for repo in local_repo_list:
2013 repo_url = repo["url"] if repo["name"] == repo_name else None
2014 return repo_url