Feature 10957: Add methods for creation of namespace, secret and RBAC
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37 from n2vc.kubectl import Kubectl
38
39
40 class K8sHelmBaseConnector(K8sConnector):
41
42 """
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
46 """
47
48 service_account = "osm"
49
50 def __init__(
51 self,
52 fs: object,
53 db: object,
54 kubectl_command: str = "/usr/bin/kubectl",
55 helm_command: str = "/usr/bin/helm",
56 log: object = None,
57 on_update_db=None,
58 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
74 self.config = EnvironConfig()
75 # random numbers for release name generation
76 random.seed(time.time())
77
78 # the file system
79 self.fs = fs
80
81 # exception if kubectl is not installed
82 self.kubectl_command = kubectl_command
83 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
84
85 # exception if helm is not installed
86 self._helm_command = helm_command
87 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
88
89 # obtain stable repo url from config or apply default
90 self._stable_repo_url = self.config.get("stablerepourl")
91 if self._stable_repo_url == "None":
92 self._stable_repo_url = None
93
94 # Lock to avoid concurrent execution of helm commands
95 self.cmd_lock = asyncio.Lock()
96
97 def _get_namespace(self, cluster_uuid: str) -> str:
98 """
99 Obtains the namespace used by the cluster with the uuid passed by argument
100
101 param: cluster_uuid: cluster's uuid
102 """
103
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster = self.db.get_one(
106 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
107 )
108 return k8scluster.get("namespace")
109
110 async def init_env(
111 self,
112 k8s_creds: str,
113 namespace: str = "kube-system",
114 reuse_cluster_uuid=None,
115 **kwargs,
116 ) -> (str, bool):
117 """
118 It prepares a given K8s cluster environment to run Charts
119
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 '.kube/config'
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
129 """
130
131 if reuse_cluster_uuid:
132 cluster_id = reuse_cluster_uuid
133 else:
134 cluster_id = str(uuid4())
135
136 self.log.debug(
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
138 )
139
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_id, create_if_not_exist=True
142 )
143 mode = stat.S_IRUSR | stat.S_IWUSR
144 with open(paths["kube_config"], "w", mode) as f:
145 f.write(k8s_creds)
146 os.chmod(paths["kube_config"], 0o600)
147
148 # Code with initialization specific of helm version
149 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
150
151 # sync fs with local data
152 self.fs.reverse_sync(from_path=cluster_id)
153
154 self.log.info("Cluster {} initialized".format(cluster_id))
155
156 return cluster_id, n2vc_installed_sw
157
158 async def repo_add(
159 self,
160 cluster_uuid: str,
161 name: str,
162 url: str,
163 repo_type: str = "chart",
164 cert: str = None,
165 user: str = None,
166 password: str = None,
167 ):
168 self.log.debug(
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid, repo_type, name, url
171 )
172 )
173
174 # init_env
175 paths, env = self._init_paths_env(
176 cluster_name=cluster_uuid, create_if_not_exist=True
177 )
178
179 # sync local dir
180 self.fs.sync(from_path=cluster_uuid)
181
182 # helm repo add name url
183 command = ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths["kube_config"], self._helm_command, name, url
185 )
186
187 if cert:
188 temp_cert_file = os.path.join(
189 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
190 )
191 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
192 with open(temp_cert_file, "w") as the_cert:
193 the_cert.write(cert)
194 command += " --ca-file {}".format(temp_cert_file)
195
196 if user:
197 command += " --username={}".format(user)
198
199 if password:
200 command += " --password={}".format(password)
201
202 self.log.debug("adding repo: {}".format(command))
203 await self._local_async_exec(
204 command=command, raise_exception_on_error=True, env=env
205 )
206
207 # helm repo update
208 command = "env KUBECONFIG={} {} repo update {}".format(
209 paths["kube_config"], self._helm_command, name
210 )
211 self.log.debug("updating repo: {}".format(command))
212 await self._local_async_exec(
213 command=command, raise_exception_on_error=False, env=env
214 )
215
216 # sync fs
217 self.fs.reverse_sync(from_path=cluster_uuid)
218
219 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
220 self.log.debug(
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid, repo_type, name
223 )
224 )
225
226 # init_env
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 # helm repo update
235 command = "{} repo update {}".format(self._helm_command, name)
236 self.log.debug("updating repo: {}".format(command))
237 await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_uuid)
243
244 async def repo_list(self, cluster_uuid: str) -> list:
245 """
246 Get the list of registered repositories
247
248 :return: list of registered repositories: [ (name, url) .... ]
249 """
250
251 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
252
253 # config filename
254 paths, env = self._init_paths_env(
255 cluster_name=cluster_uuid, create_if_not_exist=True
256 )
257
258 # sync local dir
259 self.fs.sync(from_path=cluster_uuid)
260
261 command = "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths["kube_config"], self._helm_command
263 )
264
265 # Set exception to false because if there are no repos just want an empty list
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=False, env=env
268 )
269
270 # sync fs
271 self.fs.reverse_sync(from_path=cluster_uuid)
272
273 if _rc == 0:
274 if output and len(output) > 0:
275 repos = yaml.load(output, Loader=yaml.SafeLoader)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self._lower_keys_list(repos)
278 else:
279 return []
280 else:
281 return []
282
283 async def repo_remove(self, cluster_uuid: str, name: str):
284 self.log.debug(
285 "remove {} repositories for cluster {}".format(name, cluster_uuid)
286 )
287
288 # init env, paths
289 paths, env = self._init_paths_env(
290 cluster_name=cluster_uuid, create_if_not_exist=True
291 )
292
293 # sync local dir
294 self.fs.sync(from_path=cluster_uuid)
295
296 command = "env KUBECONFIG={} {} repo remove {}".format(
297 paths["kube_config"], self._helm_command, name
298 )
299 await self._local_async_exec(
300 command=command, raise_exception_on_error=True, env=env
301 )
302
303 # sync fs
304 self.fs.reverse_sync(from_path=cluster_uuid)
305
306 async def reset(
307 self,
308 cluster_uuid: str,
309 force: bool = False,
310 uninstall_sw: bool = False,
311 **kwargs,
312 ) -> bool:
313 """Reset a cluster
314
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
322 """
323 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
324 self.log.debug(
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid, uninstall_sw
327 )
328 )
329
330 # sync local dir
331 self.fs.sync(from_path=cluster_uuid)
332
333 # uninstall releases if needed.
334 if uninstall_sw:
335 releases = await self.instances_list(cluster_uuid=cluster_uuid)
336 if len(releases) > 0:
337 if force:
338 for r in releases:
339 try:
340 kdu_instance = r.get("name")
341 chart = r.get("chart")
342 self.log.debug(
343 "Uninstalling {} -> {}".format(chart, kdu_instance)
344 )
345 await self.uninstall(
346 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
347 )
348 except Exception as e:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
351 # raised an error
352 self.log.warn(
353 "Error uninstalling release {}: {}".format(
354 kdu_instance, e
355 )
356 )
357 else:
358 msg = (
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid)
361 self.log.warn(msg)
362 uninstall_sw = (
363 False # Allow to remove k8s cluster without removing Tiller
364 )
365
366 if uninstall_sw:
367 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
368
369 # delete cluster directory
370 self.log.debug("Removing directory {}".format(cluster_uuid))
371 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
372 # Remove also local directorio if still exist
373 direct = self.fs.path + "/" + cluster_uuid
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 def _is_helm_chart_a_file(self, chart_name: str):
379 return chart_name.count("/") > 1
380
381 async def _install_impl(
382 self,
383 cluster_id: str,
384 kdu_model: str,
385 paths: dict,
386 env: dict,
387 kdu_instance: str,
388 atomic: bool = True,
389 timeout: float = 300,
390 params: dict = None,
391 db_dict: dict = None,
392 kdu_name: str = None,
393 namespace: str = None,
394 ):
395 # init env, paths
396 paths, env = self._init_paths_env(
397 cluster_name=cluster_id, create_if_not_exist=True
398 )
399
400 # params to str
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 # version
406 kdu_model, version = self._split_version(kdu_model)
407
408 _, repo = self._split_repo(kdu_model)
409 if repo:
410 await self.repo_update(cluster_id, repo)
411
412 command = self._get_install_command(
413 kdu_model,
414 kdu_instance,
415 namespace,
416 params_str,
417 version,
418 atomic,
419 timeout,
420 paths["kube_config"],
421 )
422
423 self.log.debug("installing: {}".format(command))
424
425 if atomic:
426 # exec helm in a task
427 exec_task = asyncio.ensure_future(
428 coro_or_future=self._local_async_exec(
429 command=command, raise_exception_on_error=False, env=env
430 )
431 )
432
433 # write status in another task
434 status_task = asyncio.ensure_future(
435 coro_or_future=self._store_status(
436 cluster_id=cluster_id,
437 kdu_instance=kdu_instance,
438 namespace=namespace,
439 db_dict=db_dict,
440 operation="install",
441 )
442 )
443
444 # wait for execution task
445 await asyncio.wait([exec_task])
446
447 # cancel status task
448 status_task.cancel()
449
450 output, rc = exec_task.result()
451
452 else:
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 namespace: str = None,
485 force: bool = False,
486 ):
487 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
488
489 # sync local dir
490 self.fs.sync(from_path=cluster_uuid)
491
492 # look for instance to obtain namespace
493
494 # set namespace
495 if not namespace:
496 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
497 if not instance_info:
498 raise K8sException("kdu_instance {} not found".format(kdu_instance))
499 namespace = instance_info["namespace"]
500
501 # init env, paths
502 paths, env = self._init_paths_env(
503 cluster_name=cluster_uuid, create_if_not_exist=True
504 )
505
506 # sync local dir
507 self.fs.sync(from_path=cluster_uuid)
508
509 # params to str
510 params_str, file_to_delete = self._params_to_file_option(
511 cluster_id=cluster_uuid, params=params
512 )
513
514 # version
515 kdu_model, version = self._split_version(kdu_model)
516
517 _, repo = self._split_repo(kdu_model)
518 if repo:
519 await self.repo_update(cluster_uuid, repo)
520
521 command = self._get_upgrade_command(
522 kdu_model,
523 kdu_instance,
524 namespace,
525 params_str,
526 version,
527 atomic,
528 timeout,
529 paths["kube_config"],
530 force,
531 )
532
533 self.log.debug("upgrading: {}".format(command))
534
535 if atomic:
536 # exec helm in a task
537 exec_task = asyncio.ensure_future(
538 coro_or_future=self._local_async_exec(
539 command=command, raise_exception_on_error=False, env=env
540 )
541 )
542 # write status in another task
543 status_task = asyncio.ensure_future(
544 coro_or_future=self._store_status(
545 cluster_id=cluster_uuid,
546 kdu_instance=kdu_instance,
547 namespace=namespace,
548 db_dict=db_dict,
549 operation="upgrade",
550 )
551 )
552
553 # wait for execution task
554 await asyncio.wait([exec_task])
555
556 # cancel status task
557 status_task.cancel()
558 output, rc = exec_task.result()
559
560 else:
561 output, rc = await self._local_async_exec(
562 command=command, raise_exception_on_error=False, env=env
563 )
564
565 # remove temporal values yaml file
566 if file_to_delete:
567 os.remove(file_to_delete)
568
569 # write final status
570 await self._store_status(
571 cluster_id=cluster_uuid,
572 kdu_instance=kdu_instance,
573 namespace=namespace,
574 db_dict=db_dict,
575 operation="upgrade",
576 )
577
578 if rc != 0:
579 msg = "Error executing command: {}\nOutput: {}".format(command, output)
580 self.log.error(msg)
581 raise K8sException(msg)
582
583 # sync fs
584 self.fs.reverse_sync(from_path=cluster_uuid)
585
586 # return new revision number
587 instance = await self.get_instance_info(
588 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
589 )
590 if instance:
591 revision = int(instance.get("revision"))
592 self.log.debug("New revision: {}".format(revision))
593 return revision
594 else:
595 return 0
596
597 async def scale(
598 self,
599 kdu_instance: str,
600 scale: int,
601 resource_name: str,
602 total_timeout: float = 1800,
603 cluster_uuid: str = None,
604 kdu_model: str = None,
605 atomic: bool = True,
606 db_dict: dict = None,
607 **kwargs,
608 ):
609 """Scale a resource in a Helm Chart.
610
611 Args:
612 kdu_instance: KDU instance name
613 scale: Scale to which to set the resource
614 resource_name: Resource name
615 total_timeout: The time, in seconds, to wait
616 cluster_uuid: The UUID of the cluster
617 kdu_model: The chart reference
618 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
619 The --wait flag will be set automatically if --atomic is used
620 db_dict: Dictionary for any additional data
621 kwargs: Additional parameters
622
623 Returns:
624 True if successful, False otherwise
625 """
626
627 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
628 if resource_name:
629 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
630 resource_name, kdu_model, cluster_uuid
631 )
632
633 self.log.debug(debug_mgs)
634
635 # look for instance to obtain namespace
636 # get_instance_info function calls the sync command
637 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
638 if not instance_info:
639 raise K8sException("kdu_instance {} not found".format(kdu_instance))
640
641 # init env, paths
642 paths, env = self._init_paths_env(
643 cluster_name=cluster_uuid, create_if_not_exist=True
644 )
645
646 # version
647 kdu_model, version = self._split_version(kdu_model)
648
649 repo_url = await self._find_repo(kdu_model, cluster_uuid)
650
651 _, replica_str = await self._get_replica_count_url(
652 kdu_model, repo_url, resource_name
653 )
654
655 command = self._get_upgrade_scale_command(
656 kdu_model,
657 kdu_instance,
658 instance_info["namespace"],
659 scale,
660 version,
661 atomic,
662 replica_str,
663 total_timeout,
664 resource_name,
665 paths["kube_config"],
666 )
667
668 self.log.debug("scaling: {}".format(command))
669
670 if atomic:
671 # exec helm in a task
672 exec_task = asyncio.ensure_future(
673 coro_or_future=self._local_async_exec(
674 command=command, raise_exception_on_error=False, env=env
675 )
676 )
677 # write status in another task
678 status_task = asyncio.ensure_future(
679 coro_or_future=self._store_status(
680 cluster_id=cluster_uuid,
681 kdu_instance=kdu_instance,
682 namespace=instance_info["namespace"],
683 db_dict=db_dict,
684 operation="scale",
685 )
686 )
687
688 # wait for execution task
689 await asyncio.wait([exec_task])
690
691 # cancel status task
692 status_task.cancel()
693 output, rc = exec_task.result()
694
695 else:
696 output, rc = await self._local_async_exec(
697 command=command, raise_exception_on_error=False, env=env
698 )
699
700 # write final status
701 await self._store_status(
702 cluster_id=cluster_uuid,
703 kdu_instance=kdu_instance,
704 namespace=instance_info["namespace"],
705 db_dict=db_dict,
706 operation="scale",
707 )
708
709 if rc != 0:
710 msg = "Error executing command: {}\nOutput: {}".format(command, output)
711 self.log.error(msg)
712 raise K8sException(msg)
713
714 # sync fs
715 self.fs.reverse_sync(from_path=cluster_uuid)
716
717 return True
718
719 async def get_scale_count(
720 self,
721 resource_name: str,
722 kdu_instance: str,
723 cluster_uuid: str,
724 kdu_model: str,
725 **kwargs,
726 ) -> int:
727 """Get a resource scale count.
728
729 Args:
730 cluster_uuid: The UUID of the cluster
731 resource_name: Resource name
732 kdu_instance: KDU instance name
733 kdu_model: The name or path of an Helm Chart
734 kwargs: Additional parameters
735
736 Returns:
737 Resource instance count
738 """
739
740 self.log.debug(
741 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
742 )
743
744 # look for instance to obtain namespace
745 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
746 if not instance_info:
747 raise K8sException("kdu_instance {} not found".format(kdu_instance))
748
749 # init env, paths
750 paths, _ = self._init_paths_env(
751 cluster_name=cluster_uuid, create_if_not_exist=True
752 )
753
754 replicas = await self._get_replica_count_instance(
755 kdu_instance=kdu_instance,
756 namespace=instance_info["namespace"],
757 kubeconfig=paths["kube_config"],
758 resource_name=resource_name,
759 )
760
761 self.log.debug(
762 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
763 )
764
765 # Get default value if scale count is not found from provided values
766 # Important note: this piece of code shall only be executed in the first scaling operation,
767 # since it is expected that the _get_replica_count_instance is able to obtain the number of
768 # replicas when a scale operation was already conducted previously for this KDU/resource!
769 if replicas is None:
770 repo_url = await self._find_repo(
771 kdu_model=kdu_model, cluster_uuid=cluster_uuid
772 )
773 replicas, _ = await self._get_replica_count_url(
774 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
775 )
776
777 self.log.debug(
778 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
779 f"{resource_name} obtained: {replicas}"
780 )
781
782 if replicas is None:
783 msg = "Replica count not found. Cannot be scaled"
784 self.log.error(msg)
785 raise K8sException(msg)
786
787 return int(replicas)
788
789 async def rollback(
790 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
791 ):
792 self.log.debug(
793 "rollback kdu_instance {} to revision {} from cluster {}".format(
794 kdu_instance, revision, cluster_uuid
795 )
796 )
797
798 # sync local dir
799 self.fs.sync(from_path=cluster_uuid)
800
801 # look for instance to obtain namespace
802 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
803 if not instance_info:
804 raise K8sException("kdu_instance {} not found".format(kdu_instance))
805
806 # init env, paths
807 paths, env = self._init_paths_env(
808 cluster_name=cluster_uuid, create_if_not_exist=True
809 )
810
811 # sync local dir
812 self.fs.sync(from_path=cluster_uuid)
813
814 command = self._get_rollback_command(
815 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
816 )
817
818 self.log.debug("rolling_back: {}".format(command))
819
820 # exec helm in a task
821 exec_task = asyncio.ensure_future(
822 coro_or_future=self._local_async_exec(
823 command=command, raise_exception_on_error=False, env=env
824 )
825 )
826 # write status in another task
827 status_task = asyncio.ensure_future(
828 coro_or_future=self._store_status(
829 cluster_id=cluster_uuid,
830 kdu_instance=kdu_instance,
831 namespace=instance_info["namespace"],
832 db_dict=db_dict,
833 operation="rollback",
834 )
835 )
836
837 # wait for execution task
838 await asyncio.wait([exec_task])
839
840 # cancel status task
841 status_task.cancel()
842
843 output, rc = exec_task.result()
844
845 # write final status
846 await self._store_status(
847 cluster_id=cluster_uuid,
848 kdu_instance=kdu_instance,
849 namespace=instance_info["namespace"],
850 db_dict=db_dict,
851 operation="rollback",
852 )
853
854 if rc != 0:
855 msg = "Error executing command: {}\nOutput: {}".format(command, output)
856 self.log.error(msg)
857 raise K8sException(msg)
858
859 # sync fs
860 self.fs.reverse_sync(from_path=cluster_uuid)
861
862 # return new revision number
863 instance = await self.get_instance_info(
864 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
865 )
866 if instance:
867 revision = int(instance.get("revision"))
868 self.log.debug("New revision: {}".format(revision))
869 return revision
870 else:
871 return 0
872
873 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
874 """
875 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
876 (this call should happen after all _terminate-config-primitive_ of the VNF
877 are invoked).
878
879 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
880 :param kdu_instance: unique name for the KDU instance to be deleted
881 :param kwargs: Additional parameters (None yet)
882 :return: True if successful
883 """
884
885 self.log.debug(
886 "uninstall kdu_instance {} from cluster {}".format(
887 kdu_instance, cluster_uuid
888 )
889 )
890
891 # sync local dir
892 self.fs.sync(from_path=cluster_uuid)
893
894 # look for instance to obtain namespace
895 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
896 if not instance_info:
897 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
898 return True
899 # init env, paths
900 paths, env = self._init_paths_env(
901 cluster_name=cluster_uuid, create_if_not_exist=True
902 )
903
904 # sync local dir
905 self.fs.sync(from_path=cluster_uuid)
906
907 command = self._get_uninstall_command(
908 kdu_instance, instance_info["namespace"], paths["kube_config"]
909 )
910 output, _rc = await self._local_async_exec(
911 command=command, raise_exception_on_error=True, env=env
912 )
913
914 # sync fs
915 self.fs.reverse_sync(from_path=cluster_uuid)
916
917 return self._output_to_table(output)
918
919 async def instances_list(self, cluster_uuid: str) -> list:
920 """
921 returns a list of deployed releases in a cluster
922
923 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
924 :return:
925 """
926
927 self.log.debug("list releases for cluster {}".format(cluster_uuid))
928
929 # sync local dir
930 self.fs.sync(from_path=cluster_uuid)
931
932 # execute internal command
933 result = await self._instances_list(cluster_uuid)
934
935 # sync fs
936 self.fs.reverse_sync(from_path=cluster_uuid)
937
938 return result
939
940 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
941 instances = await self.instances_list(cluster_uuid=cluster_uuid)
942 for instance in instances:
943 if instance.get("name") == kdu_instance:
944 return instance
945 self.log.debug("Instance {} not found".format(kdu_instance))
946 return None
947
948 async def upgrade_charm(
949 self,
950 ee_id: str = None,
951 path: str = None,
952 charm_id: str = None,
953 charm_type: str = None,
954 timeout: float = None,
955 ) -> str:
956 """This method upgrade charms in VNFs
957
958 Args:
959 ee_id: Execution environment id
960 path: Local path to the charm
961 charm_id: charm-id
962 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
963 timeout: (Float) Timeout for the ns update operation
964
965 Returns:
966 The output of the update operation if status equals to "completed"
967 """
968 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
969
970 async def exec_primitive(
971 self,
972 cluster_uuid: str = None,
973 kdu_instance: str = None,
974 primitive_name: str = None,
975 timeout: float = 300,
976 params: dict = None,
977 db_dict: dict = None,
978 **kwargs,
979 ) -> str:
980 """Exec primitive (Juju action)
981
982 :param cluster_uuid: The UUID of the cluster or namespace:cluster
983 :param kdu_instance: The unique name of the KDU instance
984 :param primitive_name: Name of action that will be executed
985 :param timeout: Timeout for action execution
986 :param params: Dictionary of all the parameters needed for the action
987 :db_dict: Dictionary for any additional data
988 :param kwargs: Additional parameters (None yet)
989
990 :return: Returns the output of the action
991 """
992 raise K8sException(
993 "KDUs deployed with Helm don't support actions "
994 "different from rollback, upgrade and status"
995 )
996
997 async def get_services(
998 self, cluster_uuid: str, kdu_instance: str, namespace: str
999 ) -> list:
1000 """
1001 Returns a list of services defined for the specified kdu instance.
1002
1003 :param cluster_uuid: UUID of a K8s cluster known by OSM
1004 :param kdu_instance: unique name for the KDU instance
1005 :param namespace: K8s namespace used by the KDU instance
1006 :return: If successful, it will return a list of services, Each service
1007 can have the following data:
1008 - `name` of the service
1009 - `type` type of service in the k8 cluster
1010 - `ports` List of ports offered by the service, for each port includes at least
1011 name, port, protocol
1012 - `cluster_ip` Internal ip to be used inside k8s cluster
1013 - `external_ip` List of external ips (in case they are available)
1014 """
1015
1016 self.log.debug(
1017 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1018 cluster_uuid, kdu_instance
1019 )
1020 )
1021
1022 # init env, paths
1023 paths, env = self._init_paths_env(
1024 cluster_name=cluster_uuid, create_if_not_exist=True
1025 )
1026
1027 # sync local dir
1028 self.fs.sync(from_path=cluster_uuid)
1029
1030 # get list of services names for kdu
1031 service_names = await self._get_services(
1032 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1033 )
1034
1035 service_list = []
1036 for service in service_names:
1037 service = await self._get_service(cluster_uuid, service, namespace)
1038 service_list.append(service)
1039
1040 # sync fs
1041 self.fs.reverse_sync(from_path=cluster_uuid)
1042
1043 return service_list
1044
1045 async def get_service(
1046 self, cluster_uuid: str, service_name: str, namespace: str
1047 ) -> object:
1048 self.log.debug(
1049 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1050 service_name, namespace, cluster_uuid
1051 )
1052 )
1053
1054 # sync local dir
1055 self.fs.sync(from_path=cluster_uuid)
1056
1057 service = await self._get_service(cluster_uuid, service_name, namespace)
1058
1059 # sync fs
1060 self.fs.reverse_sync(from_path=cluster_uuid)
1061
1062 return service
1063
1064 async def status_kdu(
1065 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1066 ) -> Union[str, dict]:
1067 """
1068 This call would retrieve tha current state of a given KDU instance. It would be
1069 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1070 values_ of the configuration parameters applied to a given instance. This call
1071 would be based on the `status` call.
1072
1073 :param cluster_uuid: UUID of a K8s cluster known by OSM
1074 :param kdu_instance: unique name for the KDU instance
1075 :param kwargs: Additional parameters (None yet)
1076 :param yaml_format: if the return shall be returned as an YAML string or as a
1077 dictionary
1078 :return: If successful, it will return the following vector of arguments:
1079 - K8s `namespace` in the cluster where the KDU lives
1080 - `state` of the KDU instance. It can be:
1081 - UNKNOWN
1082 - DEPLOYED
1083 - DELETED
1084 - SUPERSEDED
1085 - FAILED or
1086 - DELETING
1087 - List of `resources` (objects) that this release consists of, sorted by kind,
1088 and the status of those resources
1089 - Last `deployment_time`.
1090
1091 """
1092 self.log.debug(
1093 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1094 cluster_uuid, kdu_instance
1095 )
1096 )
1097
1098 # sync local dir
1099 self.fs.sync(from_path=cluster_uuid)
1100
1101 # get instance: needed to obtain namespace
1102 instances = await self._instances_list(cluster_id=cluster_uuid)
1103 for instance in instances:
1104 if instance.get("name") == kdu_instance:
1105 break
1106 else:
1107 # instance does not exist
1108 raise K8sException(
1109 "Instance name: {} not found in cluster: {}".format(
1110 kdu_instance, cluster_uuid
1111 )
1112 )
1113
1114 status = await self._status_kdu(
1115 cluster_id=cluster_uuid,
1116 kdu_instance=kdu_instance,
1117 namespace=instance["namespace"],
1118 yaml_format=yaml_format,
1119 show_error_log=True,
1120 )
1121
1122 # sync fs
1123 self.fs.reverse_sync(from_path=cluster_uuid)
1124
1125 return status
1126
1127 async def get_values_kdu(
1128 self, kdu_instance: str, namespace: str, kubeconfig: str
1129 ) -> str:
1130 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1131
1132 return await self._exec_get_command(
1133 get_command="values",
1134 kdu_instance=kdu_instance,
1135 namespace=namespace,
1136 kubeconfig=kubeconfig,
1137 )
1138
1139 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1140 """Method to obtain the Helm Chart package's values
1141
1142 Args:
1143 kdu_model: The name or path of an Helm Chart
1144 repo_url: Helm Chart repository url
1145
1146 Returns:
1147 str: the values of the Helm Chart package
1148 """
1149
1150 self.log.debug(
1151 "inspect kdu_model values {} from (optional) repo: {}".format(
1152 kdu_model, repo_url
1153 )
1154 )
1155
1156 return await self._exec_inspect_command(
1157 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1158 )
1159
1160 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1161 self.log.debug(
1162 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1163 )
1164
1165 return await self._exec_inspect_command(
1166 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1167 )
1168
1169 async def synchronize_repos(self, cluster_uuid: str):
1170 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1171 try:
1172 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1173 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1174
1175 local_repo_list = await self.repo_list(cluster_uuid)
1176 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1177
1178 deleted_repo_list = []
1179 added_repo_dict = {}
1180
1181 # iterate over the list of repos in the database that should be
1182 # added if not present
1183 for repo_name, db_repo in db_repo_dict.items():
1184 try:
1185 # check if it is already present
1186 curr_repo_url = local_repo_dict.get(db_repo["name"])
1187 repo_id = db_repo.get("_id")
1188 if curr_repo_url != db_repo["url"]:
1189 if curr_repo_url:
1190 self.log.debug(
1191 "repo {} url changed, delete and and again".format(
1192 db_repo["url"]
1193 )
1194 )
1195 await self.repo_remove(cluster_uuid, db_repo["name"])
1196 deleted_repo_list.append(repo_id)
1197
1198 # add repo
1199 self.log.debug("add repo {}".format(db_repo["name"]))
1200 if "ca_cert" in db_repo:
1201 await self.repo_add(
1202 cluster_uuid,
1203 db_repo["name"],
1204 db_repo["url"],
1205 cert=db_repo["ca_cert"],
1206 )
1207 else:
1208 await self.repo_add(
1209 cluster_uuid,
1210 db_repo["name"],
1211 db_repo["url"],
1212 )
1213 added_repo_dict[repo_id] = db_repo["name"]
1214 except Exception as e:
1215 raise K8sException(
1216 "Error adding repo id: {}, err_msg: {} ".format(
1217 repo_id, repr(e)
1218 )
1219 )
1220
1221 # Delete repos that are present but not in nbi_list
1222 for repo_name in local_repo_dict:
1223 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1224 self.log.debug("delete repo {}".format(repo_name))
1225 try:
1226 await self.repo_remove(cluster_uuid, repo_name)
1227 deleted_repo_list.append(repo_name)
1228 except Exception as e:
1229 self.warning(
1230 "Error deleting repo, name: {}, err_msg: {}".format(
1231 repo_name, str(e)
1232 )
1233 )
1234
1235 return deleted_repo_list, added_repo_dict
1236
1237 except K8sException:
1238 raise
1239 except Exception as e:
1240 # Do not raise errors synchronizing repos
1241 self.log.error("Error synchronizing repos: {}".format(e))
1242 raise Exception("Error synchronizing repos: {}".format(e))
1243
1244 def _get_db_repos_dict(self, repo_ids: list):
1245 db_repos_dict = {}
1246 for repo_id in repo_ids:
1247 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1248 db_repos_dict[db_repo["name"]] = db_repo
1249 return db_repos_dict
1250
1251 """
1252 ####################################################################################
1253 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1254 ####################################################################################
1255 """
1256
1257 @abc.abstractmethod
1258 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1259 """
1260 Creates and returns base cluster and kube dirs and returns them.
1261 Also created helm3 dirs according to new directory specification, paths are
1262 not returned but assigned to helm environment variables
1263
1264 :param cluster_name: cluster_name
1265 :return: Dictionary with config_paths and dictionary with helm environment variables
1266 """
1267
1268 @abc.abstractmethod
1269 async def _cluster_init(self, cluster_id, namespace, paths, env):
1270 """
1271 Implements the helm version dependent cluster initialization
1272 """
1273
1274 @abc.abstractmethod
1275 async def _instances_list(self, cluster_id):
1276 """
1277 Implements the helm version dependent helm instances list
1278 """
1279
1280 @abc.abstractmethod
1281 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1282 """
1283 Implements the helm version dependent method to obtain services from a helm instance
1284 """
1285
1286 @abc.abstractmethod
1287 async def _status_kdu(
1288 self,
1289 cluster_id: str,
1290 kdu_instance: str,
1291 namespace: str = None,
1292 yaml_format: bool = False,
1293 show_error_log: bool = False,
1294 ) -> Union[str, dict]:
1295 """
1296 Implements the helm version dependent method to obtain status of a helm instance
1297 """
1298
1299 @abc.abstractmethod
1300 def _get_install_command(
1301 self,
1302 kdu_model,
1303 kdu_instance,
1304 namespace,
1305 params_str,
1306 version,
1307 atomic,
1308 timeout,
1309 kubeconfig,
1310 ) -> str:
1311 """
1312 Obtain command to be executed to delete the indicated instance
1313 """
1314
1315 @abc.abstractmethod
1316 def _get_upgrade_scale_command(
1317 self,
1318 kdu_model,
1319 kdu_instance,
1320 namespace,
1321 count,
1322 version,
1323 atomic,
1324 replicas,
1325 timeout,
1326 resource_name,
1327 kubeconfig,
1328 ) -> str:
1329 """Generates the command to scale a Helm Chart release
1330
1331 Args:
1332 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1333 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1334 namespace (str): Namespace where this KDU instance is deployed
1335 scale (int): Scale count
1336 version (str): Constraint with specific version of the Chart to use
1337 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1338 The --wait flag will be set automatically if --atomic is used
1339 replica_str (str): The key under resource_name key where the scale count is stored
1340 timeout (float): The time, in seconds, to wait
1341 resource_name (str): The KDU's resource to scale
1342 kubeconfig (str): Kubeconfig file path
1343
1344 Returns:
1345 str: command to scale a Helm Chart release
1346 """
1347
1348 @abc.abstractmethod
1349 def _get_upgrade_command(
1350 self,
1351 kdu_model,
1352 kdu_instance,
1353 namespace,
1354 params_str,
1355 version,
1356 atomic,
1357 timeout,
1358 kubeconfig,
1359 force,
1360 ) -> str:
1361 """Generates the command to upgrade a Helm Chart release
1362
1363 Args:
1364 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1365 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1366 namespace (str): Namespace where this KDU instance is deployed
1367 params_str (str): Params used to upgrade the Helm Chart release
1368 version (str): Constraint with specific version of the Chart to use
1369 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1370 The --wait flag will be set automatically if --atomic is used
1371 timeout (float): The time, in seconds, to wait
1372 kubeconfig (str): Kubeconfig file path
1373 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1374 Returns:
1375 str: command to upgrade a Helm Chart release
1376 """
1377
1378 @abc.abstractmethod
1379 def _get_rollback_command(
1380 self, kdu_instance, namespace, revision, kubeconfig
1381 ) -> str:
1382 """
1383 Obtain command to be executed to rollback the indicated instance
1384 """
1385
1386 @abc.abstractmethod
1387 def _get_uninstall_command(
1388 self, kdu_instance: str, namespace: str, kubeconfig: str
1389 ) -> str:
1390 """
1391 Obtain command to be executed to delete the indicated instance
1392 """
1393
1394 @abc.abstractmethod
1395 def _get_inspect_command(
1396 self, show_command: str, kdu_model: str, repo_str: str, version: str
1397 ):
1398 """Generates the command to obtain the information about an Helm Chart package
1399 (´helm show ...´ command)
1400
1401 Args:
1402 show_command: the second part of the command (`helm show <show_command>`)
1403 kdu_model: The name or path of an Helm Chart
1404 repo_url: Helm Chart repository url
1405 version: constraint with specific version of the Chart to use
1406
1407 Returns:
1408 str: the generated Helm Chart command
1409 """
1410
1411 @abc.abstractmethod
1412 def _get_get_command(
1413 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1414 ):
1415 """Obtain command to be executed to get information about the kdu instance."""
1416
1417 @abc.abstractmethod
1418 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1419 """
1420 Method call to uninstall cluster software for helm. This method is dependent
1421 of helm version
1422 For Helm v2 it will be called when Tiller must be uninstalled
1423 For Helm v3 it does nothing and does not need to be callled
1424 """
1425
1426 @abc.abstractmethod
1427 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1428 """
1429 Obtains the cluster repos identifiers
1430 """
1431
1432 """
1433 ####################################################################################
1434 ################################### P R I V A T E ##################################
1435 ####################################################################################
1436 """
1437
1438 @staticmethod
1439 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1440 if os.path.exists(filename):
1441 return True
1442 else:
1443 msg = "File {} does not exist".format(filename)
1444 if exception_if_not_exists:
1445 raise K8sException(msg)
1446
1447 @staticmethod
1448 def _remove_multiple_spaces(strobj):
1449 strobj = strobj.strip()
1450 while " " in strobj:
1451 strobj = strobj.replace(" ", " ")
1452 return strobj
1453
1454 @staticmethod
1455 def _output_to_lines(output: str) -> list:
1456 output_lines = list()
1457 lines = output.splitlines(keepends=False)
1458 for line in lines:
1459 line = line.strip()
1460 if len(line) > 0:
1461 output_lines.append(line)
1462 return output_lines
1463
1464 @staticmethod
1465 def _output_to_table(output: str) -> list:
1466 output_table = list()
1467 lines = output.splitlines(keepends=False)
1468 for line in lines:
1469 line = line.replace("\t", " ")
1470 line_list = list()
1471 output_table.append(line_list)
1472 cells = line.split(sep=" ")
1473 for cell in cells:
1474 cell = cell.strip()
1475 if len(cell) > 0:
1476 line_list.append(cell)
1477 return output_table
1478
1479 @staticmethod
1480 def _parse_services(output: str) -> list:
1481 lines = output.splitlines(keepends=False)
1482 services = []
1483 for line in lines:
1484 line = line.replace("\t", " ")
1485 cells = line.split(sep=" ")
1486 if len(cells) > 0 and cells[0].startswith("service/"):
1487 elems = cells[0].split(sep="/")
1488 if len(elems) > 1:
1489 services.append(elems[1])
1490 return services
1491
1492 @staticmethod
1493 def _get_deep(dictionary: dict, members: tuple):
1494 target = dictionary
1495 value = None
1496 try:
1497 for m in members:
1498 value = target.get(m)
1499 if not value:
1500 return None
1501 else:
1502 target = value
1503 except Exception:
1504 pass
1505 return value
1506
1507 # find key:value in several lines
1508 @staticmethod
1509 def _find_in_lines(p_lines: list, p_key: str) -> str:
1510 for line in p_lines:
1511 try:
1512 if line.startswith(p_key + ":"):
1513 parts = line.split(":")
1514 the_value = parts[1].strip()
1515 return the_value
1516 except Exception:
1517 # ignore it
1518 pass
1519 return None
1520
1521 @staticmethod
1522 def _lower_keys_list(input_list: list):
1523 """
1524 Transform the keys in a list of dictionaries to lower case and returns a new list
1525 of dictionaries
1526 """
1527 new_list = []
1528 if input_list:
1529 for dictionary in input_list:
1530 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1531 new_list.append(new_dict)
1532 return new_list
1533
1534 async def _local_async_exec(
1535 self,
1536 command: str,
1537 raise_exception_on_error: bool = False,
1538 show_error_log: bool = True,
1539 encode_utf8: bool = False,
1540 env: dict = None,
1541 ) -> (str, int):
1542 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1543 self.log.debug(
1544 "Executing async local command: {}, env: {}".format(command, env)
1545 )
1546
1547 # split command
1548 command = shlex.split(command)
1549
1550 environ = os.environ.copy()
1551 if env:
1552 environ.update(env)
1553
1554 try:
1555 async with self.cmd_lock:
1556 process = await asyncio.create_subprocess_exec(
1557 *command,
1558 stdout=asyncio.subprocess.PIPE,
1559 stderr=asyncio.subprocess.PIPE,
1560 env=environ,
1561 )
1562
1563 # wait for command terminate
1564 stdout, stderr = await process.communicate()
1565
1566 return_code = process.returncode
1567
1568 output = ""
1569 if stdout:
1570 output = stdout.decode("utf-8").strip()
1571 # output = stdout.decode()
1572 if stderr:
1573 output = stderr.decode("utf-8").strip()
1574 # output = stderr.decode()
1575
1576 if return_code != 0 and show_error_log:
1577 self.log.debug(
1578 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1579 )
1580 else:
1581 self.log.debug("Return code: {}".format(return_code))
1582
1583 if raise_exception_on_error and return_code != 0:
1584 raise K8sException(output)
1585
1586 if encode_utf8:
1587 output = output.encode("utf-8").strip()
1588 output = str(output).replace("\\n", "\n")
1589
1590 return output, return_code
1591
1592 except asyncio.CancelledError:
1593 # first, kill the process if it is still running
1594 if process.returncode is None:
1595 process.kill()
1596 raise
1597 except K8sException:
1598 raise
1599 except Exception as e:
1600 msg = "Exception executing command: {} -> {}".format(command, e)
1601 self.log.error(msg)
1602 if raise_exception_on_error:
1603 raise K8sException(e) from e
1604 else:
1605 return "", -1
1606
1607 async def _local_async_exec_pipe(
1608 self,
1609 command1: str,
1610 command2: str,
1611 raise_exception_on_error: bool = True,
1612 show_error_log: bool = True,
1613 encode_utf8: bool = False,
1614 env: dict = None,
1615 ):
1616 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1617 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1618 command = "{} | {}".format(command1, command2)
1619 self.log.debug(
1620 "Executing async local command: {}, env: {}".format(command, env)
1621 )
1622
1623 # split command
1624 command1 = shlex.split(command1)
1625 command2 = shlex.split(command2)
1626
1627 environ = os.environ.copy()
1628 if env:
1629 environ.update(env)
1630
1631 try:
1632 async with self.cmd_lock:
1633 read, write = os.pipe()
1634 process_1 = await asyncio.create_subprocess_exec(
1635 *command1, stdout=write, env=environ
1636 )
1637 os.close(write)
1638 process_2 = await asyncio.create_subprocess_exec(
1639 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1640 )
1641 os.close(read)
1642 stdout, stderr = await process_2.communicate()
1643
1644 return_code = process_2.returncode
1645
1646 output = ""
1647 if stdout:
1648 output = stdout.decode("utf-8").strip()
1649 # output = stdout.decode()
1650 if stderr:
1651 output = stderr.decode("utf-8").strip()
1652 # output = stderr.decode()
1653
1654 if return_code != 0 and show_error_log:
1655 self.log.debug(
1656 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1657 )
1658 else:
1659 self.log.debug("Return code: {}".format(return_code))
1660
1661 if raise_exception_on_error and return_code != 0:
1662 raise K8sException(output)
1663
1664 if encode_utf8:
1665 output = output.encode("utf-8").strip()
1666 output = str(output).replace("\\n", "\n")
1667
1668 return output, return_code
1669 except asyncio.CancelledError:
1670 # first, kill the processes if they are still running
1671 for process in (process_1, process_2):
1672 if process.returncode is None:
1673 process.kill()
1674 raise
1675 except K8sException:
1676 raise
1677 except Exception as e:
1678 msg = "Exception executing command: {} -> {}".format(command, e)
1679 self.log.error(msg)
1680 if raise_exception_on_error:
1681 raise K8sException(e) from e
1682 else:
1683 return "", -1
1684
1685 async def _get_service(self, cluster_id, service_name, namespace):
1686 """
1687 Obtains the data of the specified service in the k8cluster.
1688
1689 :param cluster_id: id of a K8s cluster known by OSM
1690 :param service_name: name of the K8s service in the specified namespace
1691 :param namespace: K8s namespace used by the KDU instance
1692 :return: If successful, it will return a service with the following data:
1693 - `name` of the service
1694 - `type` type of service in the k8 cluster
1695 - `ports` List of ports offered by the service, for each port includes at least
1696 name, port, protocol
1697 - `cluster_ip` Internal ip to be used inside k8s cluster
1698 - `external_ip` List of external ips (in case they are available)
1699 """
1700
1701 # init config, env
1702 paths, env = self._init_paths_env(
1703 cluster_name=cluster_id, create_if_not_exist=True
1704 )
1705
1706 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1707 self.kubectl_command, paths["kube_config"], namespace, service_name
1708 )
1709
1710 output, _rc = await self._local_async_exec(
1711 command=command, raise_exception_on_error=True, env=env
1712 )
1713
1714 data = yaml.load(output, Loader=yaml.SafeLoader)
1715
1716 service = {
1717 "name": service_name,
1718 "type": self._get_deep(data, ("spec", "type")),
1719 "ports": self._get_deep(data, ("spec", "ports")),
1720 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1721 }
1722 if service["type"] == "LoadBalancer":
1723 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1724 ip_list = [elem["ip"] for elem in ip_map_list]
1725 service["external_ip"] = ip_list
1726
1727 return service
1728
1729 async def _exec_get_command(
1730 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1731 ):
1732 """Obtains information about the kdu instance."""
1733
1734 full_command = self._get_get_command(
1735 get_command, kdu_instance, namespace, kubeconfig
1736 )
1737
1738 output, _rc = await self._local_async_exec(command=full_command)
1739
1740 return output
1741
1742 async def _exec_inspect_command(
1743 self, inspect_command: str, kdu_model: str, repo_url: str = None
1744 ):
1745 """Obtains information about an Helm Chart package (´helm show´ command)
1746
1747 Args:
1748 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1749 kdu_model: The name or path of an Helm Chart
1750 repo_url: Helm Chart repository url
1751
1752 Returns:
1753 str: the requested info about the Helm Chart package
1754 """
1755
1756 repo_str = ""
1757 if repo_url:
1758 repo_str = " --repo {}".format(repo_url)
1759
1760 # Obtain the Chart's name and store it in the var kdu_model
1761 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1762
1763 kdu_model, version = self._split_version(kdu_model)
1764 if version:
1765 version_str = "--version {}".format(version)
1766 else:
1767 version_str = ""
1768
1769 full_command = self._get_inspect_command(
1770 show_command=inspect_command,
1771 kdu_model=kdu_model,
1772 repo_str=repo_str,
1773 version=version_str,
1774 )
1775
1776 output, _ = await self._local_async_exec(command=full_command)
1777
1778 return output
1779
1780 async def _get_replica_count_url(
1781 self,
1782 kdu_model: str,
1783 repo_url: str = None,
1784 resource_name: str = None,
1785 ) -> (int, str):
1786 """Get the replica count value in the Helm Chart Values.
1787
1788 Args:
1789 kdu_model: The name or path of an Helm Chart
1790 repo_url: Helm Chart repository url
1791 resource_name: Resource name
1792
1793 Returns:
1794 A tuple with:
1795 - The number of replicas of the specific instance; if not found, returns None; and
1796 - The string corresponding to the replica count key in the Helm values
1797 """
1798
1799 kdu_values = yaml.load(
1800 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1801 Loader=yaml.SafeLoader,
1802 )
1803
1804 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1805
1806 if not kdu_values:
1807 raise K8sException(
1808 "kdu_values not found for kdu_model {}".format(kdu_model)
1809 )
1810
1811 if resource_name:
1812 kdu_values = kdu_values.get(resource_name, None)
1813
1814 if not kdu_values:
1815 msg = "resource {} not found in the values in model {}".format(
1816 resource_name, kdu_model
1817 )
1818 self.log.error(msg)
1819 raise K8sException(msg)
1820
1821 duplicate_check = False
1822
1823 replica_str = ""
1824 replicas = None
1825
1826 if kdu_values.get("replicaCount") is not None:
1827 replicas = kdu_values["replicaCount"]
1828 replica_str = "replicaCount"
1829 elif kdu_values.get("replicas") is not None:
1830 duplicate_check = True
1831 replicas = kdu_values["replicas"]
1832 replica_str = "replicas"
1833 else:
1834 if resource_name:
1835 msg = (
1836 "replicaCount or replicas not found in the resource"
1837 "{} values in model {}. Cannot be scaled".format(
1838 resource_name, kdu_model
1839 )
1840 )
1841 else:
1842 msg = (
1843 "replicaCount or replicas not found in the values"
1844 "in model {}. Cannot be scaled".format(kdu_model)
1845 )
1846 self.log.error(msg)
1847 raise K8sException(msg)
1848
1849 # Control if replicas and replicaCount exists at the same time
1850 msg = "replicaCount and replicas are exists at the same time"
1851 if duplicate_check:
1852 if "replicaCount" in kdu_values:
1853 self.log.error(msg)
1854 raise K8sException(msg)
1855 else:
1856 if "replicas" in kdu_values:
1857 self.log.error(msg)
1858 raise K8sException(msg)
1859
1860 return replicas, replica_str
1861
1862 async def _get_replica_count_instance(
1863 self,
1864 kdu_instance: str,
1865 namespace: str,
1866 kubeconfig: str,
1867 resource_name: str = None,
1868 ) -> int:
1869 """Get the replica count value in the instance.
1870
1871 Args:
1872 kdu_instance: The name of the KDU instance
1873 namespace: KDU instance namespace
1874 kubeconfig:
1875 resource_name: Resource name
1876
1877 Returns:
1878 The number of replicas of the specific instance; if not found, returns None
1879 """
1880
1881 kdu_values = yaml.load(
1882 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1883 Loader=yaml.SafeLoader,
1884 )
1885
1886 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1887
1888 replicas = None
1889
1890 if kdu_values:
1891 resource_values = (
1892 kdu_values.get(resource_name, None) if resource_name else None
1893 )
1894
1895 for replica_str in ("replicaCount", "replicas"):
1896 if resource_values:
1897 replicas = resource_values.get(replica_str)
1898 else:
1899 replicas = kdu_values.get(replica_str)
1900
1901 if replicas is not None:
1902 break
1903
1904 return replicas
1905
1906 async def _store_status(
1907 self,
1908 cluster_id: str,
1909 operation: str,
1910 kdu_instance: str,
1911 namespace: str = None,
1912 db_dict: dict = None,
1913 ) -> None:
1914 """
1915 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1916
1917 :param cluster_id (str): the cluster where the KDU instance is deployed
1918 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1919 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1920 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1921 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1922 values for the keys:
1923 - "collection": The Mongo DB collection to write to
1924 - "filter": The query filter to use in the update process
1925 - "path": The dot separated keys which targets the object to be updated
1926 Defaults to None.
1927 """
1928
1929 try:
1930 detailed_status = await self._status_kdu(
1931 cluster_id=cluster_id,
1932 kdu_instance=kdu_instance,
1933 yaml_format=False,
1934 namespace=namespace,
1935 )
1936
1937 status = detailed_status.get("info").get("description")
1938 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1939
1940 # write status to db
1941 result = await self.write_app_status_to_db(
1942 db_dict=db_dict,
1943 status=str(status),
1944 detailed_status=str(detailed_status),
1945 operation=operation,
1946 )
1947
1948 if not result:
1949 self.log.info("Error writing in database. Task exiting...")
1950
1951 except asyncio.CancelledError as e:
1952 self.log.warning(
1953 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1954 )
1955 except Exception as e:
1956 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1957
1958 # params for use in -f file
1959 # returns values file option and filename (in order to delete it at the end)
1960 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1961 if params and len(params) > 0:
1962 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1963
1964 def get_random_number():
1965 r = random.randrange(start=1, stop=99999999)
1966 s = str(r)
1967 while len(s) < 10:
1968 s = "0" + s
1969 return s
1970
1971 params2 = dict()
1972 for key in params:
1973 value = params.get(key)
1974 if "!!yaml" in str(value):
1975 value = yaml.safe_load(value[7:])
1976 params2[key] = value
1977
1978 values_file = get_random_number() + ".yaml"
1979 with open(values_file, "w") as stream:
1980 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1981
1982 return "-f {}".format(values_file), values_file
1983
1984 return "", None
1985
1986 # params for use in --set option
1987 @staticmethod
1988 def _params_to_set_option(params: dict) -> str:
1989 params_str = ""
1990 if params and len(params) > 0:
1991 start = True
1992 for key in params:
1993 value = params.get(key, None)
1994 if value is not None:
1995 if start:
1996 params_str += "--set "
1997 start = False
1998 else:
1999 params_str += ","
2000 params_str += "{}={}".format(key, value)
2001 return params_str
2002
2003 @staticmethod
2004 def generate_kdu_instance_name(**kwargs):
2005 chart_name = kwargs["kdu_model"]
2006 # check embeded chart (file or dir)
2007 if chart_name.startswith("/"):
2008 # extract file or directory name
2009 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2010 # check URL
2011 elif "://" in chart_name:
2012 # extract last portion of URL
2013 chart_name = chart_name[chart_name.rfind("/") + 1 :]
2014
2015 name = ""
2016 for c in chart_name:
2017 if c.isalpha() or c.isnumeric():
2018 name += c
2019 else:
2020 name += "-"
2021 if len(name) > 35:
2022 name = name[0:35]
2023
2024 # if does not start with alpha character, prefix 'a'
2025 if not name[0].isalpha():
2026 name = "a" + name
2027
2028 name += "-"
2029
2030 def get_random_number():
2031 r = random.randrange(start=1, stop=99999999)
2032 s = str(r)
2033 s = s.rjust(10, "0")
2034 return s
2035
2036 name = name + get_random_number()
2037 return name.lower()
2038
2039 def _split_version(self, kdu_model: str) -> (str, str):
2040 version = None
2041 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2042 parts = kdu_model.split(sep=":")
2043 if len(parts) == 2:
2044 version = str(parts[1])
2045 kdu_model = parts[0]
2046 return kdu_model, version
2047
2048 def _split_repo(self, kdu_model: str) -> (str, str):
2049 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2050
2051 Args:
2052 kdu_model (str): Associated KDU model
2053
2054 Returns:
2055 (str, str): Tuple with the Chart name in index 0, and the repo name
2056 in index 2; if there was a problem finding them, return None
2057 for both
2058 """
2059
2060 chart_name = None
2061 repo_name = None
2062
2063 idx = kdu_model.find("/")
2064 if idx >= 0:
2065 chart_name = kdu_model[idx + 1 :]
2066 repo_name = kdu_model[:idx]
2067
2068 return chart_name, repo_name
2069
2070 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2071 """Obtain the Helm repository for an Helm Chart
2072
2073 Args:
2074 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2075 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2076
2077 Returns:
2078 str: the repository URL; if Helm Chart is a local one, the function returns None
2079 """
2080
2081 _, repo_name = self._split_repo(kdu_model=kdu_model)
2082
2083 repo_url = None
2084 if repo_name:
2085 # Find repository link
2086 local_repo_list = await self.repo_list(cluster_uuid)
2087 for repo in local_repo_list:
2088 if repo["name"] == repo_name:
2089 repo_url = repo["url"]
2090 break # it is not necessary to continue the loop if the repo link was found...
2091
2092 return repo_url
2093
2094 async def create_certificate(
2095 self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
2096 ):
2097 paths, env = self._init_paths_env(
2098 cluster_name=cluster_uuid, create_if_not_exist=True
2099 )
2100 kubectl = Kubectl(config_file=paths["kube_config"])
2101 await kubectl.create_certificate(
2102 namespace=namespace,
2103 name=name,
2104 dns_prefix=dns_prefix,
2105 secret_name=secret_name,
2106 usages=[usage],
2107 issuer_name="ca-issuer",
2108 )
2109
2110 async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
2111 paths, env = self._init_paths_env(
2112 cluster_name=cluster_uuid, create_if_not_exist=True
2113 )
2114 kubectl = Kubectl(config_file=paths["kube_config"])
2115 await kubectl.delete_certificate(namespace, certificate_name)
2116
2117 async def create_namespace(
2118 self,
2119 namespace,
2120 cluster_uuid,
2121 ):
2122 """
2123 Create a namespace in a specific cluster
2124
2125 :param namespace: namespace to be created
2126 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2127 :returns: None
2128 """
2129 paths, env = self._init_paths_env(
2130 cluster_name=cluster_uuid, create_if_not_exist=True
2131 )
2132 kubectl = Kubectl(config_file=paths["kube_config"])
2133 await kubectl.create_namespace(
2134 name=namespace,
2135 )
2136
2137 async def delete_namespace(
2138 self,
2139 namespace,
2140 cluster_uuid,
2141 ):
2142 """
2143 Delete a namespace in a specific cluster
2144
2145 :param namespace: namespace to be deleted
2146 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2147 :returns: None
2148 """
2149 paths, env = self._init_paths_env(
2150 cluster_name=cluster_uuid, create_if_not_exist=True
2151 )
2152 kubectl = Kubectl(config_file=paths["kube_config"])
2153 await kubectl.delete_namespace(
2154 name=namespace,
2155 )
2156
2157 async def copy_secret_data(
2158 self,
2159 src_secret: str,
2160 dst_secret: str,
2161 cluster_uuid: str,
2162 data_key: str,
2163 src_namespace: str = "osm",
2164 dst_namespace: str = "osm",
2165 ):
2166 """
2167 Copy a single key and value from an existing secret to a new one
2168
2169 :param src_secret: name of the existing secret
2170 :param dst_secret: name of the new secret
2171 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2172 :param data_key: key of the existing secret to be copied
2173 :param src_namespace: Namespace of the existing secret
2174 :param dst_namespace: Namespace of the new secret
2175 :returns: None
2176 """
2177 paths, env = self._init_paths_env(
2178 cluster_name=cluster_uuid, create_if_not_exist=True
2179 )
2180 kubectl = Kubectl(config_file=paths["kube_config"])
2181 secret_data = await kubectl.get_secret_content(
2182 name=src_secret,
2183 namespace=src_namespace,
2184 )
2185 # Only the corresponding data_key value needs to be copy
2186 data = {data_key: secret_data.get(data_key)}
2187 await kubectl.create_secret(
2188 name=dst_secret,
2189 data=data,
2190 namespace=dst_namespace,
2191 secret_type="Opaque",
2192 )
2193
2194 async def setup_default_rbac(
2195 self,
2196 name,
2197 namespace,
2198 cluster_uuid,
2199 api_groups,
2200 resources,
2201 verbs,
2202 service_account,
2203 ):
2204 """
2205 Create a basic RBAC for a new namespace.
2206
2207 :param name: name of both Role and Role Binding
2208 :param namespace: K8s namespace
2209 :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
2210 :param api_groups: Api groups to be allowed in Policy Rule
2211 :param resources: Resources to be allowed in Policy Rule
2212 :param verbs: Verbs to be allowed in Policy Rule
2213 :param service_account: Service Account name used to bind the Role
2214 :returns: None
2215 """
2216 paths, env = self._init_paths_env(
2217 cluster_name=cluster_uuid, create_if_not_exist=True
2218 )
2219 kubectl = Kubectl(config_file=paths["kube_config"])
2220 await kubectl.create_role(
2221 name=name,
2222 labels={},
2223 namespace=namespace,
2224 api_groups=api_groups,
2225 resources=resources,
2226 verbs=verbs,
2227 )
2228 await kubectl.create_role_binding(
2229 name=name,
2230 labels={},
2231 namespace=namespace,
2232 role_name=name,
2233 sa_name=service_account,
2234 )