Bug 2066 fixed
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 # Lock to avoid concurrent execution of helm commands
94 self.cmd_lock = asyncio.Lock()
95
96 def _get_namespace(self, cluster_uuid: str) -> str:
97 """
98 Obtains the namespace used by the cluster with the uuid passed by argument
99
100 param: cluster_uuid: cluster's uuid
101 """
102
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster = self.db.get_one(
105 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
106 )
107 return k8scluster.get("namespace")
108
109 async def init_env(
110 self,
111 k8s_creds: str,
112 namespace: str = "kube-system",
113 reuse_cluster_uuid=None,
114 **kwargs,
115 ) -> (str, bool):
116 """
117 It prepares a given K8s cluster environment to run Charts
118
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
120 '.kube/config'
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
128 """
129
130 if reuse_cluster_uuid:
131 cluster_id = reuse_cluster_uuid
132 else:
133 cluster_id = str(uuid4())
134
135 self.log.debug(
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
137 )
138
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142 mode = stat.S_IRUSR | stat.S_IWUSR
143 with open(paths["kube_config"], "w", mode) as f:
144 f.write(k8s_creds)
145 os.chmod(paths["kube_config"], 0o600)
146
147 # Code with initialization specific of helm version
148 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
149
150 # sync fs with local data
151 self.fs.reverse_sync(from_path=cluster_id)
152
153 self.log.info("Cluster {} initialized".format(cluster_id))
154
155 return cluster_id, n2vc_installed_sw
156
157 async def repo_add(
158 self,
159 cluster_uuid: str,
160 name: str,
161 url: str,
162 repo_type: str = "chart",
163 cert: str = None,
164 user: str = None,
165 password: str = None,
166 ):
167 self.log.debug(
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid, repo_type, name, url
170 )
171 )
172
173 # init_env
174 paths, env = self._init_paths_env(
175 cluster_name=cluster_uuid, create_if_not_exist=True
176 )
177
178 # sync local dir
179 self.fs.sync(from_path=cluster_uuid)
180
181 # helm repo add name url
182 command = ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths["kube_config"], self._helm_command, name, url
184 )
185
186 if cert:
187 temp_cert_file = os.path.join(
188 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
189 )
190 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
191 with open(temp_cert_file, "w") as the_cert:
192 the_cert.write(cert)
193 command += " --ca-file {}".format(temp_cert_file)
194
195 if user:
196 command += " --username={}".format(user)
197
198 if password:
199 command += " --password={}".format(password)
200
201 self.log.debug("adding repo: {}".format(command))
202 await self._local_async_exec(
203 command=command, raise_exception_on_error=True, env=env
204 )
205
206 # helm repo update
207 command = "env KUBECONFIG={} {} repo update {}".format(
208 paths["kube_config"], self._helm_command, name
209 )
210 self.log.debug("updating repo: {}".format(command))
211 await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
215 # sync fs
216 self.fs.reverse_sync(from_path=cluster_uuid)
217
218 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
219 self.log.debug(
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid, repo_type, name
222 )
223 )
224
225 # init_env
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_uuid, create_if_not_exist=True
228 )
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_uuid)
232
233 # helm repo update
234 command = "{} repo update {}".format(self._helm_command, name)
235 self.log.debug("updating repo: {}".format(command))
236 await self._local_async_exec(
237 command=command, raise_exception_on_error=False, env=env
238 )
239
240 # sync fs
241 self.fs.reverse_sync(from_path=cluster_uuid)
242
243 async def repo_list(self, cluster_uuid: str) -> list:
244 """
245 Get the list of registered repositories
246
247 :return: list of registered repositories: [ (name, url) .... ]
248 """
249
250 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
251
252 # config filename
253 paths, env = self._init_paths_env(
254 cluster_name=cluster_uuid, create_if_not_exist=True
255 )
256
257 # sync local dir
258 self.fs.sync(from_path=cluster_uuid)
259
260 command = "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths["kube_config"], self._helm_command
262 )
263
264 # Set exception to false because if there are no repos just want an empty list
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 # sync fs
270 self.fs.reverse_sync(from_path=cluster_uuid)
271
272 if _rc == 0:
273 if output and len(output) > 0:
274 repos = yaml.load(output, Loader=yaml.SafeLoader)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self._lower_keys_list(repos)
277 else:
278 return []
279 else:
280 return []
281
282 async def repo_remove(self, cluster_uuid: str, name: str):
283 self.log.debug(
284 "remove {} repositories for cluster {}".format(name, cluster_uuid)
285 )
286
287 # init env, paths
288 paths, env = self._init_paths_env(
289 cluster_name=cluster_uuid, create_if_not_exist=True
290 )
291
292 # sync local dir
293 self.fs.sync(from_path=cluster_uuid)
294
295 command = "env KUBECONFIG={} {} repo remove {}".format(
296 paths["kube_config"], self._helm_command, name
297 )
298 await self._local_async_exec(
299 command=command, raise_exception_on_error=True, env=env
300 )
301
302 # sync fs
303 self.fs.reverse_sync(from_path=cluster_uuid)
304
305 async def reset(
306 self,
307 cluster_uuid: str,
308 force: bool = False,
309 uninstall_sw: bool = False,
310 **kwargs,
311 ) -> bool:
312 """Reset a cluster
313
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
315
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
321 """
322 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
323 self.log.debug(
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid, uninstall_sw
326 )
327 )
328
329 # sync local dir
330 self.fs.sync(from_path=cluster_uuid)
331
332 # uninstall releases if needed.
333 if uninstall_sw:
334 releases = await self.instances_list(cluster_uuid=cluster_uuid)
335 if len(releases) > 0:
336 if force:
337 for r in releases:
338 try:
339 kdu_instance = r.get("name")
340 chart = r.get("chart")
341 self.log.debug(
342 "Uninstalling {} -> {}".format(chart, kdu_instance)
343 )
344 await self.uninstall(
345 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
346 )
347 except Exception as e:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
350 # raised an error
351 self.log.warn(
352 "Error uninstalling release {}: {}".format(
353 kdu_instance, e
354 )
355 )
356 else:
357 msg = (
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid)
360 self.log.warn(msg)
361 uninstall_sw = (
362 False # Allow to remove k8s cluster without removing Tiller
363 )
364
365 if uninstall_sw:
366 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
367
368 # delete cluster directory
369 self.log.debug("Removing directory {}".format(cluster_uuid))
370 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
371 # Remove also local directorio if still exist
372 direct = self.fs.path + "/" + cluster_uuid
373 shutil.rmtree(direct, ignore_errors=True)
374
375 return True
376
377 def _is_helm_chart_a_file(self, chart_name: str):
378 return chart_name.count("/") > 1
379
380 async def _install_impl(
381 self,
382 cluster_id: str,
383 kdu_model: str,
384 paths: dict,
385 env: dict,
386 kdu_instance: str,
387 atomic: bool = True,
388 timeout: float = 300,
389 params: dict = None,
390 db_dict: dict = None,
391 kdu_name: str = None,
392 namespace: str = None,
393 ):
394 # init env, paths
395 paths, env = self._init_paths_env(
396 cluster_name=cluster_id, create_if_not_exist=True
397 )
398
399 # params to str
400 params_str, file_to_delete = self._params_to_file_option(
401 cluster_id=cluster_id, params=params
402 )
403
404 # version
405 kdu_model, version = self._split_version(kdu_model)
406
407 _, repo = self._split_repo(kdu_model)
408 if repo:
409 await self.repo_update(cluster_id, repo)
410
411 command = self._get_install_command(
412 kdu_model,
413 kdu_instance,
414 namespace,
415 params_str,
416 version,
417 atomic,
418 timeout,
419 paths["kube_config"],
420 )
421
422 self.log.debug("installing: {}".format(command))
423
424 if atomic:
425 # exec helm in a task
426 exec_task = asyncio.ensure_future(
427 coro_or_future=self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430 )
431
432 # write status in another task
433 status_task = asyncio.ensure_future(
434 coro_or_future=self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
440 )
441 )
442
443 # wait for execution task
444 await asyncio.wait([exec_task])
445
446 # cancel status task
447 status_task.cancel()
448
449 output, rc = exec_task.result()
450
451 else:
452
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
486
487 # sync local dir
488 self.fs.sync(from_path=cluster_uuid)
489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
497 cluster_name=cluster_uuid, create_if_not_exist=True
498 )
499
500 # sync local dir
501 self.fs.sync(from_path=cluster_uuid)
502
503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
505 cluster_id=cluster_uuid, params=params
506 )
507
508 # version
509 kdu_model, version = self._split_version(kdu_model)
510
511 _, repo = self._split_repo(kdu_model)
512 if repo:
513 await self.repo_update(cluster_uuid, repo)
514
515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
523 paths["kube_config"],
524 )
525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529
530 # exec helm in a task
531 exec_task = asyncio.ensure_future(
532 coro_or_future=self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535 )
536 # write status in another task
537 status_task = asyncio.ensure_future(
538 coro_or_future=self._store_status(
539 cluster_id=cluster_uuid,
540 kdu_instance=kdu_instance,
541 namespace=instance_info["namespace"],
542 db_dict=db_dict,
543 operation="upgrade",
544 )
545 )
546
547 # wait for execution task
548 await asyncio.wait([exec_task])
549
550 # cancel status task
551 status_task.cancel()
552 output, rc = exec_task.result()
553
554 else:
555
556 output, rc = await self._local_async_exec(
557 command=command, raise_exception_on_error=False, env=env
558 )
559
560 # remove temporal values yaml file
561 if file_to_delete:
562 os.remove(file_to_delete)
563
564 # write final status
565 await self._store_status(
566 cluster_id=cluster_uuid,
567 kdu_instance=kdu_instance,
568 namespace=instance_info["namespace"],
569 db_dict=db_dict,
570 operation="upgrade",
571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
579 self.fs.reverse_sync(from_path=cluster_uuid)
580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
592 async def scale(
593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
602 **kwargs,
603 ):
604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
625 resource_name, kdu_model, cluster_uuid
626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
638 cluster_name=cluster_uuid, create_if_not_exist=True
639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
645
646 _, replica_str = await self._get_replica_count_url(
647 kdu_model, repo_url, resource_name
648 )
649
650 command = self._get_upgrade_scale_command(
651 kdu_model,
652 kdu_instance,
653 instance_info["namespace"],
654 scale,
655 version,
656 atomic,
657 replica_str,
658 total_timeout,
659 resource_name,
660 paths["kube_config"],
661 )
662
663 self.log.debug("scaling: {}".format(command))
664
665 if atomic:
666 # exec helm in a task
667 exec_task = asyncio.ensure_future(
668 coro_or_future=self._local_async_exec(
669 command=command, raise_exception_on_error=False, env=env
670 )
671 )
672 # write status in another task
673 status_task = asyncio.ensure_future(
674 coro_or_future=self._store_status(
675 cluster_id=cluster_uuid,
676 kdu_instance=kdu_instance,
677 namespace=instance_info["namespace"],
678 db_dict=db_dict,
679 operation="scale",
680 )
681 )
682
683 # wait for execution task
684 await asyncio.wait([exec_task])
685
686 # cancel status task
687 status_task.cancel()
688 output, rc = exec_task.result()
689
690 else:
691 output, rc = await self._local_async_exec(
692 command=command, raise_exception_on_error=False, env=env
693 )
694
695 # write final status
696 await self._store_status(
697 cluster_id=cluster_uuid,
698 kdu_instance=kdu_instance,
699 namespace=instance_info["namespace"],
700 db_dict=db_dict,
701 operation="scale",
702 )
703
704 if rc != 0:
705 msg = "Error executing command: {}\nOutput: {}".format(command, output)
706 self.log.error(msg)
707 raise K8sException(msg)
708
709 # sync fs
710 self.fs.reverse_sync(from_path=cluster_uuid)
711
712 return True
713
714 async def get_scale_count(
715 self,
716 resource_name: str,
717 kdu_instance: str,
718 cluster_uuid: str,
719 kdu_model: str,
720 **kwargs,
721 ) -> int:
722 """Get a resource scale count.
723
724 Args:
725 cluster_uuid: The UUID of the cluster
726 resource_name: Resource name
727 kdu_instance: KDU instance name
728 kdu_model: The name or path of an Helm Chart
729 kwargs: Additional parameters
730
731 Returns:
732 Resource instance count
733 """
734
735 self.log.debug(
736 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
737 )
738
739 # look for instance to obtain namespace
740 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
741 if not instance_info:
742 raise K8sException("kdu_instance {} not found".format(kdu_instance))
743
744 # init env, paths
745 paths, _ = self._init_paths_env(
746 cluster_name=cluster_uuid, create_if_not_exist=True
747 )
748
749 replicas = await self._get_replica_count_instance(
750 kdu_instance=kdu_instance,
751 namespace=instance_info["namespace"],
752 kubeconfig=paths["kube_config"],
753 resource_name=resource_name,
754 )
755
756 self.log.debug(
757 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
758 )
759
760 # Get default value if scale count is not found from provided values
761 # Important note: this piece of code shall only be executed in the first scaling operation,
762 # since it is expected that the _get_replica_count_instance is able to obtain the number of
763 # replicas when a scale operation was already conducted previously for this KDU/resource!
764 if replicas is None:
765 repo_url = await self._find_repo(
766 kdu_model=kdu_model, cluster_uuid=cluster_uuid
767 )
768 replicas, _ = await self._get_replica_count_url(
769 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
770 )
771
772 self.log.debug(
773 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
774 f"{resource_name} obtained: {replicas}"
775 )
776
777 if replicas is None:
778 msg = "Replica count not found. Cannot be scaled"
779 self.log.error(msg)
780 raise K8sException(msg)
781
782 return int(replicas)
783
784 async def rollback(
785 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
786 ):
787 self.log.debug(
788 "rollback kdu_instance {} to revision {} from cluster {}".format(
789 kdu_instance, revision, cluster_uuid
790 )
791 )
792
793 # sync local dir
794 self.fs.sync(from_path=cluster_uuid)
795
796 # look for instance to obtain namespace
797 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
798 if not instance_info:
799 raise K8sException("kdu_instance {} not found".format(kdu_instance))
800
801 # init env, paths
802 paths, env = self._init_paths_env(
803 cluster_name=cluster_uuid, create_if_not_exist=True
804 )
805
806 # sync local dir
807 self.fs.sync(from_path=cluster_uuid)
808
809 command = self._get_rollback_command(
810 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
811 )
812
813 self.log.debug("rolling_back: {}".format(command))
814
815 # exec helm in a task
816 exec_task = asyncio.ensure_future(
817 coro_or_future=self._local_async_exec(
818 command=command, raise_exception_on_error=False, env=env
819 )
820 )
821 # write status in another task
822 status_task = asyncio.ensure_future(
823 coro_or_future=self._store_status(
824 cluster_id=cluster_uuid,
825 kdu_instance=kdu_instance,
826 namespace=instance_info["namespace"],
827 db_dict=db_dict,
828 operation="rollback",
829 )
830 )
831
832 # wait for execution task
833 await asyncio.wait([exec_task])
834
835 # cancel status task
836 status_task.cancel()
837
838 output, rc = exec_task.result()
839
840 # write final status
841 await self._store_status(
842 cluster_id=cluster_uuid,
843 kdu_instance=kdu_instance,
844 namespace=instance_info["namespace"],
845 db_dict=db_dict,
846 operation="rollback",
847 )
848
849 if rc != 0:
850 msg = "Error executing command: {}\nOutput: {}".format(command, output)
851 self.log.error(msg)
852 raise K8sException(msg)
853
854 # sync fs
855 self.fs.reverse_sync(from_path=cluster_uuid)
856
857 # return new revision number
858 instance = await self.get_instance_info(
859 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
860 )
861 if instance:
862 revision = int(instance.get("revision"))
863 self.log.debug("New revision: {}".format(revision))
864 return revision
865 else:
866 return 0
867
868 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
869 """
870 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
871 (this call should happen after all _terminate-config-primitive_ of the VNF
872 are invoked).
873
874 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
875 :param kdu_instance: unique name for the KDU instance to be deleted
876 :param kwargs: Additional parameters (None yet)
877 :return: True if successful
878 """
879
880 self.log.debug(
881 "uninstall kdu_instance {} from cluster {}".format(
882 kdu_instance, cluster_uuid
883 )
884 )
885
886 # sync local dir
887 self.fs.sync(from_path=cluster_uuid)
888
889 # look for instance to obtain namespace
890 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
891 if not instance_info:
892 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
893 return True
894 # init env, paths
895 paths, env = self._init_paths_env(
896 cluster_name=cluster_uuid, create_if_not_exist=True
897 )
898
899 # sync local dir
900 self.fs.sync(from_path=cluster_uuid)
901
902 command = self._get_uninstall_command(
903 kdu_instance, instance_info["namespace"], paths["kube_config"]
904 )
905 output, _rc = await self._local_async_exec(
906 command=command, raise_exception_on_error=True, env=env
907 )
908
909 # sync fs
910 self.fs.reverse_sync(from_path=cluster_uuid)
911
912 return self._output_to_table(output)
913
914 async def instances_list(self, cluster_uuid: str) -> list:
915 """
916 returns a list of deployed releases in a cluster
917
918 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
919 :return:
920 """
921
922 self.log.debug("list releases for cluster {}".format(cluster_uuid))
923
924 # sync local dir
925 self.fs.sync(from_path=cluster_uuid)
926
927 # execute internal command
928 result = await self._instances_list(cluster_uuid)
929
930 # sync fs
931 self.fs.reverse_sync(from_path=cluster_uuid)
932
933 return result
934
935 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
936 instances = await self.instances_list(cluster_uuid=cluster_uuid)
937 for instance in instances:
938 if instance.get("name") == kdu_instance:
939 return instance
940 self.log.debug("Instance {} not found".format(kdu_instance))
941 return None
942
943 async def upgrade_charm(
944 self,
945 ee_id: str = None,
946 path: str = None,
947 charm_id: str = None,
948 charm_type: str = None,
949 timeout: float = None,
950 ) -> str:
951 """This method upgrade charms in VNFs
952
953 Args:
954 ee_id: Execution environment id
955 path: Local path to the charm
956 charm_id: charm-id
957 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
958 timeout: (Float) Timeout for the ns update operation
959
960 Returns:
961 The output of the update operation if status equals to "completed"
962 """
963 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
964
965 async def exec_primitive(
966 self,
967 cluster_uuid: str = None,
968 kdu_instance: str = None,
969 primitive_name: str = None,
970 timeout: float = 300,
971 params: dict = None,
972 db_dict: dict = None,
973 **kwargs,
974 ) -> str:
975 """Exec primitive (Juju action)
976
977 :param cluster_uuid: The UUID of the cluster or namespace:cluster
978 :param kdu_instance: The unique name of the KDU instance
979 :param primitive_name: Name of action that will be executed
980 :param timeout: Timeout for action execution
981 :param params: Dictionary of all the parameters needed for the action
982 :db_dict: Dictionary for any additional data
983 :param kwargs: Additional parameters (None yet)
984
985 :return: Returns the output of the action
986 """
987 raise K8sException(
988 "KDUs deployed with Helm don't support actions "
989 "different from rollback, upgrade and status"
990 )
991
992 async def get_services(
993 self, cluster_uuid: str, kdu_instance: str, namespace: str
994 ) -> list:
995 """
996 Returns a list of services defined for the specified kdu instance.
997
998 :param cluster_uuid: UUID of a K8s cluster known by OSM
999 :param kdu_instance: unique name for the KDU instance
1000 :param namespace: K8s namespace used by the KDU instance
1001 :return: If successful, it will return a list of services, Each service
1002 can have the following data:
1003 - `name` of the service
1004 - `type` type of service in the k8 cluster
1005 - `ports` List of ports offered by the service, for each port includes at least
1006 name, port, protocol
1007 - `cluster_ip` Internal ip to be used inside k8s cluster
1008 - `external_ip` List of external ips (in case they are available)
1009 """
1010
1011 self.log.debug(
1012 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1013 cluster_uuid, kdu_instance
1014 )
1015 )
1016
1017 # init env, paths
1018 paths, env = self._init_paths_env(
1019 cluster_name=cluster_uuid, create_if_not_exist=True
1020 )
1021
1022 # sync local dir
1023 self.fs.sync(from_path=cluster_uuid)
1024
1025 # get list of services names for kdu
1026 service_names = await self._get_services(
1027 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1028 )
1029
1030 service_list = []
1031 for service in service_names:
1032 service = await self._get_service(cluster_uuid, service, namespace)
1033 service_list.append(service)
1034
1035 # sync fs
1036 self.fs.reverse_sync(from_path=cluster_uuid)
1037
1038 return service_list
1039
1040 async def get_service(
1041 self, cluster_uuid: str, service_name: str, namespace: str
1042 ) -> object:
1043
1044 self.log.debug(
1045 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1046 service_name, namespace, cluster_uuid
1047 )
1048 )
1049
1050 # sync local dir
1051 self.fs.sync(from_path=cluster_uuid)
1052
1053 service = await self._get_service(cluster_uuid, service_name, namespace)
1054
1055 # sync fs
1056 self.fs.reverse_sync(from_path=cluster_uuid)
1057
1058 return service
1059
1060 async def status_kdu(
1061 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1062 ) -> Union[str, dict]:
1063 """
1064 This call would retrieve tha current state of a given KDU instance. It would be
1065 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1066 values_ of the configuration parameters applied to a given instance. This call
1067 would be based on the `status` call.
1068
1069 :param cluster_uuid: UUID of a K8s cluster known by OSM
1070 :param kdu_instance: unique name for the KDU instance
1071 :param kwargs: Additional parameters (None yet)
1072 :param yaml_format: if the return shall be returned as an YAML string or as a
1073 dictionary
1074 :return: If successful, it will return the following vector of arguments:
1075 - K8s `namespace` in the cluster where the KDU lives
1076 - `state` of the KDU instance. It can be:
1077 - UNKNOWN
1078 - DEPLOYED
1079 - DELETED
1080 - SUPERSEDED
1081 - FAILED or
1082 - DELETING
1083 - List of `resources` (objects) that this release consists of, sorted by kind,
1084 and the status of those resources
1085 - Last `deployment_time`.
1086
1087 """
1088 self.log.debug(
1089 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1090 cluster_uuid, kdu_instance
1091 )
1092 )
1093
1094 # sync local dir
1095 self.fs.sync(from_path=cluster_uuid)
1096
1097 # get instance: needed to obtain namespace
1098 instances = await self._instances_list(cluster_id=cluster_uuid)
1099 for instance in instances:
1100 if instance.get("name") == kdu_instance:
1101 break
1102 else:
1103 # instance does not exist
1104 raise K8sException(
1105 "Instance name: {} not found in cluster: {}".format(
1106 kdu_instance, cluster_uuid
1107 )
1108 )
1109
1110 status = await self._status_kdu(
1111 cluster_id=cluster_uuid,
1112 kdu_instance=kdu_instance,
1113 namespace=instance["namespace"],
1114 yaml_format=yaml_format,
1115 show_error_log=True,
1116 )
1117
1118 # sync fs
1119 self.fs.reverse_sync(from_path=cluster_uuid)
1120
1121 return status
1122
1123 async def get_values_kdu(
1124 self, kdu_instance: str, namespace: str, kubeconfig: str
1125 ) -> str:
1126
1127 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1128
1129 return await self._exec_get_command(
1130 get_command="values",
1131 kdu_instance=kdu_instance,
1132 namespace=namespace,
1133 kubeconfig=kubeconfig,
1134 )
1135
1136 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1137 """Method to obtain the Helm Chart package's values
1138
1139 Args:
1140 kdu_model: The name or path of an Helm Chart
1141 repo_url: Helm Chart repository url
1142
1143 Returns:
1144 str: the values of the Helm Chart package
1145 """
1146
1147 self.log.debug(
1148 "inspect kdu_model values {} from (optional) repo: {}".format(
1149 kdu_model, repo_url
1150 )
1151 )
1152
1153 return await self._exec_inspect_command(
1154 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1155 )
1156
1157 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1158
1159 self.log.debug(
1160 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1161 )
1162
1163 return await self._exec_inspect_command(
1164 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1165 )
1166
1167 async def synchronize_repos(self, cluster_uuid: str):
1168
1169 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1170 try:
1171 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1172 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1173
1174 local_repo_list = await self.repo_list(cluster_uuid)
1175 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1176
1177 deleted_repo_list = []
1178 added_repo_dict = {}
1179
1180 # iterate over the list of repos in the database that should be
1181 # added if not present
1182 for repo_name, db_repo in db_repo_dict.items():
1183 try:
1184 # check if it is already present
1185 curr_repo_url = local_repo_dict.get(db_repo["name"])
1186 repo_id = db_repo.get("_id")
1187 if curr_repo_url != db_repo["url"]:
1188 if curr_repo_url:
1189 self.log.debug(
1190 "repo {} url changed, delete and and again".format(
1191 db_repo["url"]
1192 )
1193 )
1194 await self.repo_remove(cluster_uuid, db_repo["name"])
1195 deleted_repo_list.append(repo_id)
1196
1197 # add repo
1198 self.log.debug("add repo {}".format(db_repo["name"]))
1199 if "ca_cert" in db_repo:
1200 await self.repo_add(
1201 cluster_uuid,
1202 db_repo["name"],
1203 db_repo["url"],
1204 cert=db_repo["ca_cert"],
1205 )
1206 else:
1207 await self.repo_add(
1208 cluster_uuid,
1209 db_repo["name"],
1210 db_repo["url"],
1211 )
1212 added_repo_dict[repo_id] = db_repo["name"]
1213 except Exception as e:
1214 raise K8sException(
1215 "Error adding repo id: {}, err_msg: {} ".format(
1216 repo_id, repr(e)
1217 )
1218 )
1219
1220 # Delete repos that are present but not in nbi_list
1221 for repo_name in local_repo_dict:
1222 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1223 self.log.debug("delete repo {}".format(repo_name))
1224 try:
1225 await self.repo_remove(cluster_uuid, repo_name)
1226 deleted_repo_list.append(repo_name)
1227 except Exception as e:
1228 self.warning(
1229 "Error deleting repo, name: {}, err_msg: {}".format(
1230 repo_name, str(e)
1231 )
1232 )
1233
1234 return deleted_repo_list, added_repo_dict
1235
1236 except K8sException:
1237 raise
1238 except Exception as e:
1239 # Do not raise errors synchronizing repos
1240 self.log.error("Error synchronizing repos: {}".format(e))
1241 raise Exception("Error synchronizing repos: {}".format(e))
1242
1243 def _get_db_repos_dict(self, repo_ids: list):
1244 db_repos_dict = {}
1245 for repo_id in repo_ids:
1246 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1247 db_repos_dict[db_repo["name"]] = db_repo
1248 return db_repos_dict
1249
1250 """
1251 ####################################################################################
1252 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1253 ####################################################################################
1254 """
1255
1256 @abc.abstractmethod
1257 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1258 """
1259 Creates and returns base cluster and kube dirs and returns them.
1260 Also created helm3 dirs according to new directory specification, paths are
1261 not returned but assigned to helm environment variables
1262
1263 :param cluster_name: cluster_name
1264 :return: Dictionary with config_paths and dictionary with helm environment variables
1265 """
1266
1267 @abc.abstractmethod
1268 async def _cluster_init(self, cluster_id, namespace, paths, env):
1269 """
1270 Implements the helm version dependent cluster initialization
1271 """
1272
1273 @abc.abstractmethod
1274 async def _instances_list(self, cluster_id):
1275 """
1276 Implements the helm version dependent helm instances list
1277 """
1278
1279 @abc.abstractmethod
1280 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1281 """
1282 Implements the helm version dependent method to obtain services from a helm instance
1283 """
1284
1285 @abc.abstractmethod
1286 async def _status_kdu(
1287 self,
1288 cluster_id: str,
1289 kdu_instance: str,
1290 namespace: str = None,
1291 yaml_format: bool = False,
1292 show_error_log: bool = False,
1293 ) -> Union[str, dict]:
1294 """
1295 Implements the helm version dependent method to obtain status of a helm instance
1296 """
1297
1298 @abc.abstractmethod
1299 def _get_install_command(
1300 self,
1301 kdu_model,
1302 kdu_instance,
1303 namespace,
1304 params_str,
1305 version,
1306 atomic,
1307 timeout,
1308 kubeconfig,
1309 ) -> str:
1310 """
1311 Obtain command to be executed to delete the indicated instance
1312 """
1313
1314 @abc.abstractmethod
1315 def _get_upgrade_scale_command(
1316 self,
1317 kdu_model,
1318 kdu_instance,
1319 namespace,
1320 count,
1321 version,
1322 atomic,
1323 replicas,
1324 timeout,
1325 resource_name,
1326 kubeconfig,
1327 ) -> str:
1328 """Obtain command to be executed to upgrade the indicated instance."""
1329
1330 @abc.abstractmethod
1331 def _get_upgrade_command(
1332 self,
1333 kdu_model,
1334 kdu_instance,
1335 namespace,
1336 params_str,
1337 version,
1338 atomic,
1339 timeout,
1340 kubeconfig,
1341 ) -> str:
1342 """
1343 Obtain command to be executed to upgrade the indicated instance
1344 """
1345
1346 @abc.abstractmethod
1347 def _get_rollback_command(
1348 self, kdu_instance, namespace, revision, kubeconfig
1349 ) -> str:
1350 """
1351 Obtain command to be executed to rollback the indicated instance
1352 """
1353
1354 @abc.abstractmethod
1355 def _get_uninstall_command(
1356 self, kdu_instance: str, namespace: str, kubeconfig: str
1357 ) -> str:
1358 """
1359 Obtain command to be executed to delete the indicated instance
1360 """
1361
1362 @abc.abstractmethod
1363 def _get_inspect_command(
1364 self, show_command: str, kdu_model: str, repo_str: str, version: str
1365 ):
1366 """Generates the command to obtain the information about an Helm Chart package
1367 (´helm show ...´ command)
1368
1369 Args:
1370 show_command: the second part of the command (`helm show <show_command>`)
1371 kdu_model: The name or path of an Helm Chart
1372 repo_url: Helm Chart repository url
1373 version: constraint with specific version of the Chart to use
1374
1375 Returns:
1376 str: the generated Helm Chart command
1377 """
1378
1379 @abc.abstractmethod
1380 def _get_get_command(
1381 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1382 ):
1383 """Obtain command to be executed to get information about the kdu instance."""
1384
1385 @abc.abstractmethod
1386 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1387 """
1388 Method call to uninstall cluster software for helm. This method is dependent
1389 of helm version
1390 For Helm v2 it will be called when Tiller must be uninstalled
1391 For Helm v3 it does nothing and does not need to be callled
1392 """
1393
1394 @abc.abstractmethod
1395 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1396 """
1397 Obtains the cluster repos identifiers
1398 """
1399
1400 """
1401 ####################################################################################
1402 ################################### P R I V A T E ##################################
1403 ####################################################################################
1404 """
1405
1406 @staticmethod
1407 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1408 if os.path.exists(filename):
1409 return True
1410 else:
1411 msg = "File {} does not exist".format(filename)
1412 if exception_if_not_exists:
1413 raise K8sException(msg)
1414
1415 @staticmethod
1416 def _remove_multiple_spaces(strobj):
1417 strobj = strobj.strip()
1418 while " " in strobj:
1419 strobj = strobj.replace(" ", " ")
1420 return strobj
1421
1422 @staticmethod
1423 def _output_to_lines(output: str) -> list:
1424 output_lines = list()
1425 lines = output.splitlines(keepends=False)
1426 for line in lines:
1427 line = line.strip()
1428 if len(line) > 0:
1429 output_lines.append(line)
1430 return output_lines
1431
1432 @staticmethod
1433 def _output_to_table(output: str) -> list:
1434 output_table = list()
1435 lines = output.splitlines(keepends=False)
1436 for line in lines:
1437 line = line.replace("\t", " ")
1438 line_list = list()
1439 output_table.append(line_list)
1440 cells = line.split(sep=" ")
1441 for cell in cells:
1442 cell = cell.strip()
1443 if len(cell) > 0:
1444 line_list.append(cell)
1445 return output_table
1446
1447 @staticmethod
1448 def _parse_services(output: str) -> list:
1449 lines = output.splitlines(keepends=False)
1450 services = []
1451 for line in lines:
1452 line = line.replace("\t", " ")
1453 cells = line.split(sep=" ")
1454 if len(cells) > 0 and cells[0].startswith("service/"):
1455 elems = cells[0].split(sep="/")
1456 if len(elems) > 1:
1457 services.append(elems[1])
1458 return services
1459
1460 @staticmethod
1461 def _get_deep(dictionary: dict, members: tuple):
1462 target = dictionary
1463 value = None
1464 try:
1465 for m in members:
1466 value = target.get(m)
1467 if not value:
1468 return None
1469 else:
1470 target = value
1471 except Exception:
1472 pass
1473 return value
1474
1475 # find key:value in several lines
1476 @staticmethod
1477 def _find_in_lines(p_lines: list, p_key: str) -> str:
1478 for line in p_lines:
1479 try:
1480 if line.startswith(p_key + ":"):
1481 parts = line.split(":")
1482 the_value = parts[1].strip()
1483 return the_value
1484 except Exception:
1485 # ignore it
1486 pass
1487 return None
1488
1489 @staticmethod
1490 def _lower_keys_list(input_list: list):
1491 """
1492 Transform the keys in a list of dictionaries to lower case and returns a new list
1493 of dictionaries
1494 """
1495 new_list = []
1496 if input_list:
1497 for dictionary in input_list:
1498 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1499 new_list.append(new_dict)
1500 return new_list
1501
1502 async def _local_async_exec(
1503 self,
1504 command: str,
1505 raise_exception_on_error: bool = False,
1506 show_error_log: bool = True,
1507 encode_utf8: bool = False,
1508 env: dict = None,
1509 ) -> (str, int):
1510
1511 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1512 self.log.debug(
1513 "Executing async local command: {}, env: {}".format(command, env)
1514 )
1515
1516 # split command
1517 command = shlex.split(command)
1518
1519 environ = os.environ.copy()
1520 if env:
1521 environ.update(env)
1522
1523 try:
1524 async with self.cmd_lock:
1525 process = await asyncio.create_subprocess_exec(
1526 *command,
1527 stdout=asyncio.subprocess.PIPE,
1528 stderr=asyncio.subprocess.PIPE,
1529 env=environ,
1530 )
1531
1532 # wait for command terminate
1533 stdout, stderr = await process.communicate()
1534
1535 return_code = process.returncode
1536
1537 output = ""
1538 if stdout:
1539 output = stdout.decode("utf-8").strip()
1540 # output = stdout.decode()
1541 if stderr:
1542 output = stderr.decode("utf-8").strip()
1543 # output = stderr.decode()
1544
1545 if return_code != 0 and show_error_log:
1546 self.log.debug(
1547 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1548 )
1549 else:
1550 self.log.debug("Return code: {}".format(return_code))
1551
1552 if raise_exception_on_error and return_code != 0:
1553 raise K8sException(output)
1554
1555 if encode_utf8:
1556 output = output.encode("utf-8").strip()
1557 output = str(output).replace("\\n", "\n")
1558
1559 return output, return_code
1560
1561 except asyncio.CancelledError:
1562 # first, kill the process if it is still running
1563 if process.returncode is None:
1564 process.kill()
1565 raise
1566 except K8sException:
1567 raise
1568 except Exception as e:
1569 msg = "Exception executing command: {} -> {}".format(command, e)
1570 self.log.error(msg)
1571 if raise_exception_on_error:
1572 raise K8sException(e) from e
1573 else:
1574 return "", -1
1575
1576 async def _local_async_exec_pipe(
1577 self,
1578 command1: str,
1579 command2: str,
1580 raise_exception_on_error: bool = True,
1581 show_error_log: bool = True,
1582 encode_utf8: bool = False,
1583 env: dict = None,
1584 ):
1585
1586 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1587 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1588 command = "{} | {}".format(command1, command2)
1589 self.log.debug(
1590 "Executing async local command: {}, env: {}".format(command, env)
1591 )
1592
1593 # split command
1594 command1 = shlex.split(command1)
1595 command2 = shlex.split(command2)
1596
1597 environ = os.environ.copy()
1598 if env:
1599 environ.update(env)
1600
1601 try:
1602 async with self.cmd_lock:
1603 read, write = os.pipe()
1604 process_1 = await asyncio.create_subprocess_exec(
1605 *command1, stdout=write, env=environ
1606 )
1607 os.close(write)
1608 process_2 = await asyncio.create_subprocess_exec(
1609 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1610 )
1611 os.close(read)
1612 stdout, stderr = await process_2.communicate()
1613
1614 return_code = process_2.returncode
1615
1616 output = ""
1617 if stdout:
1618 output = stdout.decode("utf-8").strip()
1619 # output = stdout.decode()
1620 if stderr:
1621 output = stderr.decode("utf-8").strip()
1622 # output = stderr.decode()
1623
1624 if return_code != 0 and show_error_log:
1625 self.log.debug(
1626 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1627 )
1628 else:
1629 self.log.debug("Return code: {}".format(return_code))
1630
1631 if raise_exception_on_error and return_code != 0:
1632 raise K8sException(output)
1633
1634 if encode_utf8:
1635 output = output.encode("utf-8").strip()
1636 output = str(output).replace("\\n", "\n")
1637
1638 return output, return_code
1639 except asyncio.CancelledError:
1640 # first, kill the processes if they are still running
1641 for process in (process_1, process_2):
1642 if process.returncode is None:
1643 process.kill()
1644 raise
1645 except K8sException:
1646 raise
1647 except Exception as e:
1648 msg = "Exception executing command: {} -> {}".format(command, e)
1649 self.log.error(msg)
1650 if raise_exception_on_error:
1651 raise K8sException(e) from e
1652 else:
1653 return "", -1
1654
1655 async def _get_service(self, cluster_id, service_name, namespace):
1656 """
1657 Obtains the data of the specified service in the k8cluster.
1658
1659 :param cluster_id: id of a K8s cluster known by OSM
1660 :param service_name: name of the K8s service in the specified namespace
1661 :param namespace: K8s namespace used by the KDU instance
1662 :return: If successful, it will return a service with the following data:
1663 - `name` of the service
1664 - `type` type of service in the k8 cluster
1665 - `ports` List of ports offered by the service, for each port includes at least
1666 name, port, protocol
1667 - `cluster_ip` Internal ip to be used inside k8s cluster
1668 - `external_ip` List of external ips (in case they are available)
1669 """
1670
1671 # init config, env
1672 paths, env = self._init_paths_env(
1673 cluster_name=cluster_id, create_if_not_exist=True
1674 )
1675
1676 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1677 self.kubectl_command, paths["kube_config"], namespace, service_name
1678 )
1679
1680 output, _rc = await self._local_async_exec(
1681 command=command, raise_exception_on_error=True, env=env
1682 )
1683
1684 data = yaml.load(output, Loader=yaml.SafeLoader)
1685
1686 service = {
1687 "name": service_name,
1688 "type": self._get_deep(data, ("spec", "type")),
1689 "ports": self._get_deep(data, ("spec", "ports")),
1690 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1691 }
1692 if service["type"] == "LoadBalancer":
1693 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1694 ip_list = [elem["ip"] for elem in ip_map_list]
1695 service["external_ip"] = ip_list
1696
1697 return service
1698
1699 async def _exec_get_command(
1700 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1701 ):
1702 """Obtains information about the kdu instance."""
1703
1704 full_command = self._get_get_command(
1705 get_command, kdu_instance, namespace, kubeconfig
1706 )
1707
1708 output, _rc = await self._local_async_exec(command=full_command)
1709
1710 return output
1711
1712 async def _exec_inspect_command(
1713 self, inspect_command: str, kdu_model: str, repo_url: str = None
1714 ):
1715 """Obtains information about an Helm Chart package (´helm show´ command)
1716
1717 Args:
1718 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1719 kdu_model: The name or path of an Helm Chart
1720 repo_url: Helm Chart repository url
1721
1722 Returns:
1723 str: the requested info about the Helm Chart package
1724 """
1725
1726 repo_str = ""
1727 if repo_url:
1728 repo_str = " --repo {}".format(repo_url)
1729
1730 # Obtain the Chart's name and store it in the var kdu_model
1731 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1732
1733 kdu_model, version = self._split_version(kdu_model)
1734 if version:
1735 version_str = "--version {}".format(version)
1736 else:
1737 version_str = ""
1738
1739 full_command = self._get_inspect_command(
1740 show_command=inspect_command,
1741 kdu_model=kdu_model,
1742 repo_str=repo_str,
1743 version=version_str,
1744 )
1745
1746 output, _ = await self._local_async_exec(command=full_command)
1747
1748 return output
1749
1750 async def _get_replica_count_url(
1751 self,
1752 kdu_model: str,
1753 repo_url: str = None,
1754 resource_name: str = None,
1755 ) -> (int, str):
1756 """Get the replica count value in the Helm Chart Values.
1757
1758 Args:
1759 kdu_model: The name or path of an Helm Chart
1760 repo_url: Helm Chart repository url
1761 resource_name: Resource name
1762
1763 Returns:
1764 A tuple with:
1765 - The number of replicas of the specific instance; if not found, returns None; and
1766 - The string corresponding to the replica count key in the Helm values
1767 """
1768
1769 kdu_values = yaml.load(
1770 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1771 Loader=yaml.SafeLoader,
1772 )
1773
1774 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1775
1776 if not kdu_values:
1777 raise K8sException(
1778 "kdu_values not found for kdu_model {}".format(kdu_model)
1779 )
1780
1781 if resource_name:
1782 kdu_values = kdu_values.get(resource_name, None)
1783
1784 if not kdu_values:
1785 msg = "resource {} not found in the values in model {}".format(
1786 resource_name, kdu_model
1787 )
1788 self.log.error(msg)
1789 raise K8sException(msg)
1790
1791 duplicate_check = False
1792
1793 replica_str = ""
1794 replicas = None
1795
1796 if kdu_values.get("replicaCount") is not None:
1797 replicas = kdu_values["replicaCount"]
1798 replica_str = "replicaCount"
1799 elif kdu_values.get("replicas") is not None:
1800 duplicate_check = True
1801 replicas = kdu_values["replicas"]
1802 replica_str = "replicas"
1803 else:
1804 if resource_name:
1805 msg = (
1806 "replicaCount or replicas not found in the resource"
1807 "{} values in model {}. Cannot be scaled".format(
1808 resource_name, kdu_model
1809 )
1810 )
1811 else:
1812 msg = (
1813 "replicaCount or replicas not found in the values"
1814 "in model {}. Cannot be scaled".format(kdu_model)
1815 )
1816 self.log.error(msg)
1817 raise K8sException(msg)
1818
1819 # Control if replicas and replicaCount exists at the same time
1820 msg = "replicaCount and replicas are exists at the same time"
1821 if duplicate_check:
1822 if "replicaCount" in kdu_values:
1823 self.log.error(msg)
1824 raise K8sException(msg)
1825 else:
1826 if "replicas" in kdu_values:
1827 self.log.error(msg)
1828 raise K8sException(msg)
1829
1830 return replicas, replica_str
1831
1832 async def _get_replica_count_instance(
1833 self,
1834 kdu_instance: str,
1835 namespace: str,
1836 kubeconfig: str,
1837 resource_name: str = None,
1838 ) -> int:
1839 """Get the replica count value in the instance.
1840
1841 Args:
1842 kdu_instance: The name of the KDU instance
1843 namespace: KDU instance namespace
1844 kubeconfig:
1845 resource_name: Resource name
1846
1847 Returns:
1848 The number of replicas of the specific instance; if not found, returns None
1849 """
1850
1851 kdu_values = yaml.load(
1852 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1853 Loader=yaml.SafeLoader,
1854 )
1855
1856 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1857
1858 replicas = None
1859
1860 if kdu_values:
1861 resource_values = (
1862 kdu_values.get(resource_name, None) if resource_name else None
1863 )
1864
1865 for replica_str in ("replicaCount", "replicas"):
1866 if resource_values:
1867 replicas = resource_values.get(replica_str)
1868 else:
1869 replicas = kdu_values.get(replica_str)
1870
1871 if replicas is not None:
1872 break
1873
1874 return replicas
1875
1876 async def _store_status(
1877 self,
1878 cluster_id: str,
1879 operation: str,
1880 kdu_instance: str,
1881 namespace: str = None,
1882 db_dict: dict = None,
1883 ) -> None:
1884 """
1885 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1886
1887 :param cluster_id (str): the cluster where the KDU instance is deployed
1888 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1889 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1890 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1891 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1892 values for the keys:
1893 - "collection": The Mongo DB collection to write to
1894 - "filter": The query filter to use in the update process
1895 - "path": The dot separated keys which targets the object to be updated
1896 Defaults to None.
1897 """
1898
1899 try:
1900 detailed_status = await self._status_kdu(
1901 cluster_id=cluster_id,
1902 kdu_instance=kdu_instance,
1903 yaml_format=False,
1904 namespace=namespace,
1905 )
1906
1907 status = detailed_status.get("info").get("description")
1908 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1909
1910 # write status to db
1911 result = await self.write_app_status_to_db(
1912 db_dict=db_dict,
1913 status=str(status),
1914 detailed_status=str(detailed_status),
1915 operation=operation,
1916 )
1917
1918 if not result:
1919 self.log.info("Error writing in database. Task exiting...")
1920
1921 except asyncio.CancelledError as e:
1922 self.log.warning(
1923 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1924 )
1925 except Exception as e:
1926 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1927
1928 # params for use in -f file
1929 # returns values file option and filename (in order to delete it at the end)
1930 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1931
1932 if params and len(params) > 0:
1933 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1934
1935 def get_random_number():
1936 r = random.randrange(start=1, stop=99999999)
1937 s = str(r)
1938 while len(s) < 10:
1939 s = "0" + s
1940 return s
1941
1942 params2 = dict()
1943 for key in params:
1944 value = params.get(key)
1945 if "!!yaml" in str(value):
1946 value = yaml.safe_load(value[7:])
1947 params2[key] = value
1948
1949 values_file = get_random_number() + ".yaml"
1950 with open(values_file, "w") as stream:
1951 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1952
1953 return "-f {}".format(values_file), values_file
1954
1955 return "", None
1956
1957 # params for use in --set option
1958 @staticmethod
1959 def _params_to_set_option(params: dict) -> str:
1960 params_str = ""
1961 if params and len(params) > 0:
1962 start = True
1963 for key in params:
1964 value = params.get(key, None)
1965 if value is not None:
1966 if start:
1967 params_str += "--set "
1968 start = False
1969 else:
1970 params_str += ","
1971 params_str += "{}={}".format(key, value)
1972 return params_str
1973
1974 @staticmethod
1975 def generate_kdu_instance_name(**kwargs):
1976 chart_name = kwargs["kdu_model"]
1977 # check embeded chart (file or dir)
1978 if chart_name.startswith("/"):
1979 # extract file or directory name
1980 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1981 # check URL
1982 elif "://" in chart_name:
1983 # extract last portion of URL
1984 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1985
1986 name = ""
1987 for c in chart_name:
1988 if c.isalpha() or c.isnumeric():
1989 name += c
1990 else:
1991 name += "-"
1992 if len(name) > 35:
1993 name = name[0:35]
1994
1995 # if does not start with alpha character, prefix 'a'
1996 if not name[0].isalpha():
1997 name = "a" + name
1998
1999 name += "-"
2000
2001 def get_random_number():
2002 r = random.randrange(start=1, stop=99999999)
2003 s = str(r)
2004 s = s.rjust(10, "0")
2005 return s
2006
2007 name = name + get_random_number()
2008 return name.lower()
2009
2010 def _split_version(self, kdu_model: str) -> (str, str):
2011 version = None
2012 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2013 parts = kdu_model.split(sep=":")
2014 if len(parts) == 2:
2015 version = str(parts[1])
2016 kdu_model = parts[0]
2017 return kdu_model, version
2018
2019 def _split_repo(self, kdu_model: str) -> (str, str):
2020 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2021
2022 Args:
2023 kdu_model (str): Associated KDU model
2024
2025 Returns:
2026 (str, str): Tuple with the Chart name in index 0, and the repo name
2027 in index 2; if there was a problem finding them, return None
2028 for both
2029 """
2030
2031 chart_name = None
2032 repo_name = None
2033
2034 idx = kdu_model.find("/")
2035 if idx >= 0:
2036 chart_name = kdu_model[idx + 1 :]
2037 repo_name = kdu_model[:idx]
2038
2039 return chart_name, repo_name
2040
2041 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2042 """Obtain the Helm repository for an Helm Chart
2043
2044 Args:
2045 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2046 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2047
2048 Returns:
2049 str: the repository URL; if Helm Chart is a local one, the function returns None
2050 """
2051
2052 _, repo_name = self._split_repo(kdu_model=kdu_model)
2053
2054 repo_url = None
2055 if repo_name:
2056 # Find repository link
2057 local_repo_list = await self.repo_list(cluster_uuid)
2058 for repo in local_repo_list:
2059 if repo["name"] == repo_name:
2060 repo_url = repo["url"]
2061 break # it is not necessary to continue the loop if the repo link was found...
2062
2063 return repo_url