Fix bug 2088 by quoting inputs for commands
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 from shlex import quote
26 import random
27 import time
28 import shlex
29 import shutil
30 import stat
31 import os
32 import yaml
33 from uuid import uuid4
34
35 from n2vc.config import EnvironConfig
36 from n2vc.exceptions import K8sException
37 from n2vc.k8s_conn import K8sConnector
38
39
40 class K8sHelmBaseConnector(K8sConnector):
41
42 """
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
46 """
47
48 service_account = "osm"
49
50 def __init__(
51 self,
52 fs: object,
53 db: object,
54 kubectl_command: str = "/usr/bin/kubectl",
55 helm_command: str = "/usr/bin/helm",
56 log: object = None,
57 on_update_db=None,
58 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
74 self.config = EnvironConfig()
75 # random numbers for release name generation
76 random.seed(time.time())
77
78 # the file system
79 self.fs = fs
80
81 # exception if kubectl is not installed
82 self.kubectl_command = kubectl_command
83 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
84
85 # exception if helm is not installed
86 self._helm_command = helm_command
87 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
88
89 # obtain stable repo url from config or apply default
90 self._stable_repo_url = self.config.get("stablerepourl")
91 if self._stable_repo_url == "None":
92 self._stable_repo_url = None
93
94 # Lock to avoid concurrent execution of helm commands
95 self.cmd_lock = asyncio.Lock()
96
97 def _get_namespace(self, cluster_uuid: str) -> str:
98 """
99 Obtains the namespace used by the cluster with the uuid passed by argument
100
101 param: cluster_uuid: cluster's uuid
102 """
103
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster = self.db.get_one(
106 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
107 )
108 return k8scluster.get("namespace")
109
110 async def init_env(
111 self,
112 k8s_creds: str,
113 namespace: str = "kube-system",
114 reuse_cluster_uuid=None,
115 **kwargs,
116 ) -> (str, bool):
117 """
118 It prepares a given K8s cluster environment to run Charts
119
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 '.kube/config'
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
129 """
130
131 if reuse_cluster_uuid:
132 cluster_id = reuse_cluster_uuid
133 else:
134 cluster_id = str(uuid4())
135
136 self.log.debug(
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
138 )
139
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_id, create_if_not_exist=True
142 )
143 mode = stat.S_IRUSR | stat.S_IWUSR
144 with open(paths["kube_config"], "w", mode) as f:
145 f.write(k8s_creds)
146 os.chmod(paths["kube_config"], 0o600)
147
148 # Code with initialization specific of helm version
149 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
150
151 # sync fs with local data
152 self.fs.reverse_sync(from_path=cluster_id)
153
154 self.log.info("Cluster {} initialized".format(cluster_id))
155
156 return cluster_id, n2vc_installed_sw
157
158 async def repo_add(
159 self,
160 cluster_uuid: str,
161 name: str,
162 url: str,
163 repo_type: str = "chart",
164 cert: str = None,
165 user: str = None,
166 password: str = None,
167 ):
168 self.log.debug(
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid, repo_type, name, url
171 )
172 )
173
174 # init_env
175 paths, env = self._init_paths_env(
176 cluster_name=cluster_uuid, create_if_not_exist=True
177 )
178
179 # sync local dir
180 self.fs.sync(from_path=cluster_uuid)
181
182 # helm repo add name url
183 command = ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths["kube_config"], self._helm_command, quote(name), quote(url)
185 )
186
187 if cert:
188 temp_cert_file = os.path.join(
189 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
190 )
191 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
192 with open(temp_cert_file, "w") as the_cert:
193 the_cert.write(cert)
194 command += " --ca-file {}".format(quote(temp_cert_file))
195
196 if user:
197 command += " --username={}".format(quote(user))
198
199 if password:
200 command += " --password={}".format(quote(password))
201
202 self.log.debug("adding repo: {}".format(command))
203 await self._local_async_exec(
204 command=command, raise_exception_on_error=True, env=env
205 )
206
207 # helm repo update
208 command = "env KUBECONFIG={} {} repo update {}".format(
209 paths["kube_config"], self._helm_command, quote(name)
210 )
211 self.log.debug("updating repo: {}".format(command))
212 await self._local_async_exec(
213 command=command, raise_exception_on_error=False, env=env
214 )
215
216 # sync fs
217 self.fs.reverse_sync(from_path=cluster_uuid)
218
219 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
220 self.log.debug(
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid, repo_type, name
223 )
224 )
225
226 # init_env
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 # helm repo update
235 command = "{} repo update {}".format(self._helm_command, quote(name))
236 self.log.debug("updating repo: {}".format(command))
237 await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_uuid)
243
244 async def repo_list(self, cluster_uuid: str) -> list:
245 """
246 Get the list of registered repositories
247
248 :return: list of registered repositories: [ (name, url) .... ]
249 """
250
251 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
252
253 # config filename
254 paths, env = self._init_paths_env(
255 cluster_name=cluster_uuid, create_if_not_exist=True
256 )
257
258 # sync local dir
259 self.fs.sync(from_path=cluster_uuid)
260
261 command = "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths["kube_config"], self._helm_command
263 )
264
265 # Set exception to false because if there are no repos just want an empty list
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=False, env=env
268 )
269
270 # sync fs
271 self.fs.reverse_sync(from_path=cluster_uuid)
272
273 if _rc == 0:
274 if output and len(output) > 0:
275 repos = yaml.load(output, Loader=yaml.SafeLoader)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self._lower_keys_list(repos)
278 else:
279 return []
280 else:
281 return []
282
283 async def repo_remove(self, cluster_uuid: str, name: str):
284 self.log.debug(
285 "remove {} repositories for cluster {}".format(name, cluster_uuid)
286 )
287
288 # init env, paths
289 paths, env = self._init_paths_env(
290 cluster_name=cluster_uuid, create_if_not_exist=True
291 )
292
293 # sync local dir
294 self.fs.sync(from_path=cluster_uuid)
295
296 command = "env KUBECONFIG={} {} repo remove {}".format(
297 paths["kube_config"], self._helm_command, quote(name)
298 )
299 await self._local_async_exec(
300 command=command, raise_exception_on_error=True, env=env
301 )
302
303 # sync fs
304 self.fs.reverse_sync(from_path=cluster_uuid)
305
306 async def reset(
307 self,
308 cluster_uuid: str,
309 force: bool = False,
310 uninstall_sw: bool = False,
311 **kwargs,
312 ) -> bool:
313 """Reset a cluster
314
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
322 """
323 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
324 self.log.debug(
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid, uninstall_sw
327 )
328 )
329
330 # sync local dir
331 self.fs.sync(from_path=cluster_uuid)
332
333 # uninstall releases if needed.
334 if uninstall_sw:
335 releases = await self.instances_list(cluster_uuid=cluster_uuid)
336 if len(releases) > 0:
337 if force:
338 for r in releases:
339 try:
340 kdu_instance = r.get("name")
341 chart = r.get("chart")
342 self.log.debug(
343 "Uninstalling {} -> {}".format(chart, kdu_instance)
344 )
345 await self.uninstall(
346 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
347 )
348 except Exception as e:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
351 # raised an error
352 self.log.warn(
353 "Error uninstalling release {}: {}".format(
354 kdu_instance, e
355 )
356 )
357 else:
358 msg = (
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid)
361 self.log.warn(msg)
362 uninstall_sw = (
363 False # Allow to remove k8s cluster without removing Tiller
364 )
365
366 if uninstall_sw:
367 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
368
369 # delete cluster directory
370 self.log.debug("Removing directory {}".format(cluster_uuid))
371 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
372 # Remove also local directorio if still exist
373 direct = self.fs.path + "/" + cluster_uuid
374 shutil.rmtree(direct, ignore_errors=True)
375
376 return True
377
378 def _is_helm_chart_a_file(self, chart_name: str):
379 return chart_name.count("/") > 1
380
381 async def _install_impl(
382 self,
383 cluster_id: str,
384 kdu_model: str,
385 paths: dict,
386 env: dict,
387 kdu_instance: str,
388 atomic: bool = True,
389 timeout: float = 300,
390 params: dict = None,
391 db_dict: dict = None,
392 kdu_name: str = None,
393 namespace: str = None,
394 ):
395 # init env, paths
396 paths, env = self._init_paths_env(
397 cluster_name=cluster_id, create_if_not_exist=True
398 )
399
400 # params to str
401 params_str, file_to_delete = self._params_to_file_option(
402 cluster_id=cluster_id, params=params
403 )
404
405 # version
406 kdu_model, version = self._split_version(kdu_model)
407
408 _, repo = self._split_repo(kdu_model)
409 if repo:
410 await self.repo_update(cluster_id, repo)
411
412 command = self._get_install_command(
413 kdu_model,
414 kdu_instance,
415 namespace,
416 params_str,
417 version,
418 atomic,
419 timeout,
420 paths["kube_config"],
421 )
422
423 self.log.debug("installing: {}".format(command))
424
425 if atomic:
426 # exec helm in a task
427 exec_task = asyncio.ensure_future(
428 coro_or_future=self._local_async_exec(
429 command=command, raise_exception_on_error=False, env=env
430 )
431 )
432
433 # write status in another task
434 status_task = asyncio.ensure_future(
435 coro_or_future=self._store_status(
436 cluster_id=cluster_id,
437 kdu_instance=kdu_instance,
438 namespace=namespace,
439 db_dict=db_dict,
440 operation="install",
441 )
442 )
443
444 # wait for execution task
445 await asyncio.wait([exec_task])
446
447 # cancel status task
448 status_task.cancel()
449
450 output, rc = exec_task.result()
451
452 else:
453 output, rc = await self._local_async_exec(
454 command=command, raise_exception_on_error=False, env=env
455 )
456
457 # remove temporal values yaml file
458 if file_to_delete:
459 os.remove(file_to_delete)
460
461 # write final status
462 await self._store_status(
463 cluster_id=cluster_id,
464 kdu_instance=kdu_instance,
465 namespace=namespace,
466 db_dict=db_dict,
467 operation="install",
468 )
469
470 if rc != 0:
471 msg = "Error executing command: {}\nOutput: {}".format(command, output)
472 self.log.error(msg)
473 raise K8sException(msg)
474
475 async def upgrade(
476 self,
477 cluster_uuid: str,
478 kdu_instance: str,
479 kdu_model: str = None,
480 atomic: bool = True,
481 timeout: float = 300,
482 params: dict = None,
483 db_dict: dict = None,
484 ):
485 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
486
487 # sync local dir
488 self.fs.sync(from_path=cluster_uuid)
489
490 # look for instance to obtain namespace
491 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
492 if not instance_info:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance))
494
495 # init env, paths
496 paths, env = self._init_paths_env(
497 cluster_name=cluster_uuid, create_if_not_exist=True
498 )
499
500 # sync local dir
501 self.fs.sync(from_path=cluster_uuid)
502
503 # params to str
504 params_str, file_to_delete = self._params_to_file_option(
505 cluster_id=cluster_uuid, params=params
506 )
507
508 # version
509 kdu_model, version = self._split_version(kdu_model)
510
511 _, repo = self._split_repo(kdu_model)
512 if repo:
513 await self.repo_update(cluster_uuid, repo)
514
515 command = self._get_upgrade_command(
516 kdu_model,
517 kdu_instance,
518 instance_info["namespace"],
519 params_str,
520 version,
521 atomic,
522 timeout,
523 paths["kube_config"],
524 )
525
526 self.log.debug("upgrading: {}".format(command))
527
528 if atomic:
529 # exec helm in a task
530 exec_task = asyncio.ensure_future(
531 coro_or_future=self._local_async_exec(
532 command=command, raise_exception_on_error=False, env=env
533 )
534 )
535 # write status in another task
536 status_task = asyncio.ensure_future(
537 coro_or_future=self._store_status(
538 cluster_id=cluster_uuid,
539 kdu_instance=kdu_instance,
540 namespace=instance_info["namespace"],
541 db_dict=db_dict,
542 operation="upgrade",
543 )
544 )
545
546 # wait for execution task
547 await asyncio.wait([exec_task])
548
549 # cancel status task
550 status_task.cancel()
551 output, rc = exec_task.result()
552
553 else:
554 output, rc = await self._local_async_exec(
555 command=command, raise_exception_on_error=False, env=env
556 )
557
558 # remove temporal values yaml file
559 if file_to_delete:
560 os.remove(file_to_delete)
561
562 # write final status
563 await self._store_status(
564 cluster_id=cluster_uuid,
565 kdu_instance=kdu_instance,
566 namespace=instance_info["namespace"],
567 db_dict=db_dict,
568 operation="upgrade",
569 )
570
571 if rc != 0:
572 msg = "Error executing command: {}\nOutput: {}".format(command, output)
573 self.log.error(msg)
574 raise K8sException(msg)
575
576 # sync fs
577 self.fs.reverse_sync(from_path=cluster_uuid)
578
579 # return new revision number
580 instance = await self.get_instance_info(
581 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
582 )
583 if instance:
584 revision = int(instance.get("revision"))
585 self.log.debug("New revision: {}".format(revision))
586 return revision
587 else:
588 return 0
589
590 async def scale(
591 self,
592 kdu_instance: str,
593 scale: int,
594 resource_name: str,
595 total_timeout: float = 1800,
596 cluster_uuid: str = None,
597 kdu_model: str = None,
598 atomic: bool = True,
599 db_dict: dict = None,
600 **kwargs,
601 ):
602 """Scale a resource in a Helm Chart.
603
604 Args:
605 kdu_instance: KDU instance name
606 scale: Scale to which to set the resource
607 resource_name: Resource name
608 total_timeout: The time, in seconds, to wait
609 cluster_uuid: The UUID of the cluster
610 kdu_model: The chart reference
611 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
612 The --wait flag will be set automatically if --atomic is used
613 db_dict: Dictionary for any additional data
614 kwargs: Additional parameters
615
616 Returns:
617 True if successful, False otherwise
618 """
619
620 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
621 if resource_name:
622 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
623 resource_name, kdu_model, cluster_uuid
624 )
625
626 self.log.debug(debug_mgs)
627
628 # look for instance to obtain namespace
629 # get_instance_info function calls the sync command
630 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
631 if not instance_info:
632 raise K8sException("kdu_instance {} not found".format(kdu_instance))
633
634 # init env, paths
635 paths, env = self._init_paths_env(
636 cluster_name=cluster_uuid, create_if_not_exist=True
637 )
638
639 # version
640 kdu_model, version = self._split_version(kdu_model)
641
642 repo_url = await self._find_repo(kdu_model, cluster_uuid)
643
644 _, replica_str = await self._get_replica_count_url(
645 kdu_model, repo_url, resource_name
646 )
647
648 command = self._get_upgrade_scale_command(
649 kdu_model,
650 kdu_instance,
651 instance_info["namespace"],
652 scale,
653 version,
654 atomic,
655 replica_str,
656 total_timeout,
657 resource_name,
658 paths["kube_config"],
659 )
660
661 self.log.debug("scaling: {}".format(command))
662
663 if atomic:
664 # exec helm in a task
665 exec_task = asyncio.ensure_future(
666 coro_or_future=self._local_async_exec(
667 command=command, raise_exception_on_error=False, env=env
668 )
669 )
670 # write status in another task
671 status_task = asyncio.ensure_future(
672 coro_or_future=self._store_status(
673 cluster_id=cluster_uuid,
674 kdu_instance=kdu_instance,
675 namespace=instance_info["namespace"],
676 db_dict=db_dict,
677 operation="scale",
678 )
679 )
680
681 # wait for execution task
682 await asyncio.wait([exec_task])
683
684 # cancel status task
685 status_task.cancel()
686 output, rc = exec_task.result()
687
688 else:
689 output, rc = await self._local_async_exec(
690 command=command, raise_exception_on_error=False, env=env
691 )
692
693 # write final status
694 await self._store_status(
695 cluster_id=cluster_uuid,
696 kdu_instance=kdu_instance,
697 namespace=instance_info["namespace"],
698 db_dict=db_dict,
699 operation="scale",
700 )
701
702 if rc != 0:
703 msg = "Error executing command: {}\nOutput: {}".format(command, output)
704 self.log.error(msg)
705 raise K8sException(msg)
706
707 # sync fs
708 self.fs.reverse_sync(from_path=cluster_uuid)
709
710 return True
711
712 async def get_scale_count(
713 self,
714 resource_name: str,
715 kdu_instance: str,
716 cluster_uuid: str,
717 kdu_model: str,
718 **kwargs,
719 ) -> int:
720 """Get a resource scale count.
721
722 Args:
723 cluster_uuid: The UUID of the cluster
724 resource_name: Resource name
725 kdu_instance: KDU instance name
726 kdu_model: The name or path of an Helm Chart
727 kwargs: Additional parameters
728
729 Returns:
730 Resource instance count
731 """
732
733 self.log.debug(
734 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
735 )
736
737 # look for instance to obtain namespace
738 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
739 if not instance_info:
740 raise K8sException("kdu_instance {} not found".format(kdu_instance))
741
742 # init env, paths
743 paths, _ = self._init_paths_env(
744 cluster_name=cluster_uuid, create_if_not_exist=True
745 )
746
747 replicas = await self._get_replica_count_instance(
748 kdu_instance=kdu_instance,
749 namespace=instance_info["namespace"],
750 kubeconfig=paths["kube_config"],
751 resource_name=resource_name,
752 )
753
754 self.log.debug(
755 f"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
756 )
757
758 # Get default value if scale count is not found from provided values
759 # Important note: this piece of code shall only be executed in the first scaling operation,
760 # since it is expected that the _get_replica_count_instance is able to obtain the number of
761 # replicas when a scale operation was already conducted previously for this KDU/resource!
762 if replicas is None:
763 repo_url = await self._find_repo(
764 kdu_model=kdu_model, cluster_uuid=cluster_uuid
765 )
766 replicas, _ = await self._get_replica_count_url(
767 kdu_model=kdu_model, repo_url=repo_url, resource_name=resource_name
768 )
769
770 self.log.debug(
771 f"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
772 f"{resource_name} obtained: {replicas}"
773 )
774
775 if replicas is None:
776 msg = "Replica count not found. Cannot be scaled"
777 self.log.error(msg)
778 raise K8sException(msg)
779
780 return int(replicas)
781
782 async def rollback(
783 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
784 ):
785 self.log.debug(
786 "rollback kdu_instance {} to revision {} from cluster {}".format(
787 kdu_instance, revision, cluster_uuid
788 )
789 )
790
791 # sync local dir
792 self.fs.sync(from_path=cluster_uuid)
793
794 # look for instance to obtain namespace
795 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
796 if not instance_info:
797 raise K8sException("kdu_instance {} not found".format(kdu_instance))
798
799 # init env, paths
800 paths, env = self._init_paths_env(
801 cluster_name=cluster_uuid, create_if_not_exist=True
802 )
803
804 # sync local dir
805 self.fs.sync(from_path=cluster_uuid)
806
807 command = self._get_rollback_command(
808 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
809 )
810
811 self.log.debug("rolling_back: {}".format(command))
812
813 # exec helm in a task
814 exec_task = asyncio.ensure_future(
815 coro_or_future=self._local_async_exec(
816 command=command, raise_exception_on_error=False, env=env
817 )
818 )
819 # write status in another task
820 status_task = asyncio.ensure_future(
821 coro_or_future=self._store_status(
822 cluster_id=cluster_uuid,
823 kdu_instance=kdu_instance,
824 namespace=instance_info["namespace"],
825 db_dict=db_dict,
826 operation="rollback",
827 )
828 )
829
830 # wait for execution task
831 await asyncio.wait([exec_task])
832
833 # cancel status task
834 status_task.cancel()
835
836 output, rc = exec_task.result()
837
838 # write final status
839 await self._store_status(
840 cluster_id=cluster_uuid,
841 kdu_instance=kdu_instance,
842 namespace=instance_info["namespace"],
843 db_dict=db_dict,
844 operation="rollback",
845 )
846
847 if rc != 0:
848 msg = "Error executing command: {}\nOutput: {}".format(command, output)
849 self.log.error(msg)
850 raise K8sException(msg)
851
852 # sync fs
853 self.fs.reverse_sync(from_path=cluster_uuid)
854
855 # return new revision number
856 instance = await self.get_instance_info(
857 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
858 )
859 if instance:
860 revision = int(instance.get("revision"))
861 self.log.debug("New revision: {}".format(revision))
862 return revision
863 else:
864 return 0
865
866 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
867 """
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
870 are invoked).
871
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
876 """
877
878 self.log.debug(
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance, cluster_uuid
881 )
882 )
883
884 # sync local dir
885 self.fs.sync(from_path=cluster_uuid)
886
887 # look for instance to obtain namespace
888 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
889 if not instance_info:
890 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
891 return True
892 # init env, paths
893 paths, env = self._init_paths_env(
894 cluster_name=cluster_uuid, create_if_not_exist=True
895 )
896
897 # sync local dir
898 self.fs.sync(from_path=cluster_uuid)
899
900 command = self._get_uninstall_command(
901 kdu_instance, instance_info["namespace"], paths["kube_config"]
902 )
903 output, _rc = await self._local_async_exec(
904 command=command, raise_exception_on_error=True, env=env
905 )
906
907 # sync fs
908 self.fs.reverse_sync(from_path=cluster_uuid)
909
910 return self._output_to_table(output)
911
912 async def instances_list(self, cluster_uuid: str) -> list:
913 """
914 returns a list of deployed releases in a cluster
915
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
917 :return:
918 """
919
920 self.log.debug("list releases for cluster {}".format(cluster_uuid))
921
922 # sync local dir
923 self.fs.sync(from_path=cluster_uuid)
924
925 # execute internal command
926 result = await self._instances_list(cluster_uuid)
927
928 # sync fs
929 self.fs.reverse_sync(from_path=cluster_uuid)
930
931 return result
932
933 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
934 instances = await self.instances_list(cluster_uuid=cluster_uuid)
935 for instance in instances:
936 if instance.get("name") == kdu_instance:
937 return instance
938 self.log.debug("Instance {} not found".format(kdu_instance))
939 return None
940
941 async def upgrade_charm(
942 self,
943 ee_id: str = None,
944 path: str = None,
945 charm_id: str = None,
946 charm_type: str = None,
947 timeout: float = None,
948 ) -> str:
949 """This method upgrade charms in VNFs
950
951 Args:
952 ee_id: Execution environment id
953 path: Local path to the charm
954 charm_id: charm-id
955 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
956 timeout: (Float) Timeout for the ns update operation
957
958 Returns:
959 The output of the update operation if status equals to "completed"
960 """
961 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
962
963 async def exec_primitive(
964 self,
965 cluster_uuid: str = None,
966 kdu_instance: str = None,
967 primitive_name: str = None,
968 timeout: float = 300,
969 params: dict = None,
970 db_dict: dict = None,
971 **kwargs,
972 ) -> str:
973 """Exec primitive (Juju action)
974
975 :param cluster_uuid: The UUID of the cluster or namespace:cluster
976 :param kdu_instance: The unique name of the KDU instance
977 :param primitive_name: Name of action that will be executed
978 :param timeout: Timeout for action execution
979 :param params: Dictionary of all the parameters needed for the action
980 :db_dict: Dictionary for any additional data
981 :param kwargs: Additional parameters (None yet)
982
983 :return: Returns the output of the action
984 """
985 raise K8sException(
986 "KDUs deployed with Helm don't support actions "
987 "different from rollback, upgrade and status"
988 )
989
990 async def get_services(
991 self, cluster_uuid: str, kdu_instance: str, namespace: str
992 ) -> list:
993 """
994 Returns a list of services defined for the specified kdu instance.
995
996 :param cluster_uuid: UUID of a K8s cluster known by OSM
997 :param kdu_instance: unique name for the KDU instance
998 :param namespace: K8s namespace used by the KDU instance
999 :return: If successful, it will return a list of services, Each service
1000 can have the following data:
1001 - `name` of the service
1002 - `type` type of service in the k8 cluster
1003 - `ports` List of ports offered by the service, for each port includes at least
1004 name, port, protocol
1005 - `cluster_ip` Internal ip to be used inside k8s cluster
1006 - `external_ip` List of external ips (in case they are available)
1007 """
1008
1009 self.log.debug(
1010 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1011 cluster_uuid, kdu_instance
1012 )
1013 )
1014
1015 # init env, paths
1016 paths, env = self._init_paths_env(
1017 cluster_name=cluster_uuid, create_if_not_exist=True
1018 )
1019
1020 # sync local dir
1021 self.fs.sync(from_path=cluster_uuid)
1022
1023 # get list of services names for kdu
1024 service_names = await self._get_services(
1025 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1026 )
1027
1028 service_list = []
1029 for service in service_names:
1030 service = await self._get_service(cluster_uuid, service, namespace)
1031 service_list.append(service)
1032
1033 # sync fs
1034 self.fs.reverse_sync(from_path=cluster_uuid)
1035
1036 return service_list
1037
1038 async def get_service(
1039 self, cluster_uuid: str, service_name: str, namespace: str
1040 ) -> object:
1041 self.log.debug(
1042 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1043 service_name, namespace, cluster_uuid
1044 )
1045 )
1046
1047 # sync local dir
1048 self.fs.sync(from_path=cluster_uuid)
1049
1050 service = await self._get_service(cluster_uuid, service_name, namespace)
1051
1052 # sync fs
1053 self.fs.reverse_sync(from_path=cluster_uuid)
1054
1055 return service
1056
1057 async def status_kdu(
1058 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1059 ) -> Union[str, dict]:
1060 """
1061 This call would retrieve tha current state of a given KDU instance. It would be
1062 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1063 values_ of the configuration parameters applied to a given instance. This call
1064 would be based on the `status` call.
1065
1066 :param cluster_uuid: UUID of a K8s cluster known by OSM
1067 :param kdu_instance: unique name for the KDU instance
1068 :param kwargs: Additional parameters (None yet)
1069 :param yaml_format: if the return shall be returned as an YAML string or as a
1070 dictionary
1071 :return: If successful, it will return the following vector of arguments:
1072 - K8s `namespace` in the cluster where the KDU lives
1073 - `state` of the KDU instance. It can be:
1074 - UNKNOWN
1075 - DEPLOYED
1076 - DELETED
1077 - SUPERSEDED
1078 - FAILED or
1079 - DELETING
1080 - List of `resources` (objects) that this release consists of, sorted by kind,
1081 and the status of those resources
1082 - Last `deployment_time`.
1083
1084 """
1085 self.log.debug(
1086 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1087 cluster_uuid, kdu_instance
1088 )
1089 )
1090
1091 # sync local dir
1092 self.fs.sync(from_path=cluster_uuid)
1093
1094 # get instance: needed to obtain namespace
1095 instances = await self._instances_list(cluster_id=cluster_uuid)
1096 for instance in instances:
1097 if instance.get("name") == kdu_instance:
1098 break
1099 else:
1100 # instance does not exist
1101 raise K8sException(
1102 "Instance name: {} not found in cluster: {}".format(
1103 kdu_instance, cluster_uuid
1104 )
1105 )
1106
1107 status = await self._status_kdu(
1108 cluster_id=cluster_uuid,
1109 kdu_instance=kdu_instance,
1110 namespace=instance["namespace"],
1111 yaml_format=yaml_format,
1112 show_error_log=True,
1113 )
1114
1115 # sync fs
1116 self.fs.reverse_sync(from_path=cluster_uuid)
1117
1118 return status
1119
1120 async def get_values_kdu(
1121 self, kdu_instance: str, namespace: str, kubeconfig: str
1122 ) -> str:
1123 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1124
1125 return await self._exec_get_command(
1126 get_command="values",
1127 kdu_instance=kdu_instance,
1128 namespace=namespace,
1129 kubeconfig=kubeconfig,
1130 )
1131
1132 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1133 """Method to obtain the Helm Chart package's values
1134
1135 Args:
1136 kdu_model: The name or path of an Helm Chart
1137 repo_url: Helm Chart repository url
1138
1139 Returns:
1140 str: the values of the Helm Chart package
1141 """
1142
1143 self.log.debug(
1144 "inspect kdu_model values {} from (optional) repo: {}".format(
1145 kdu_model, repo_url
1146 )
1147 )
1148
1149 return await self._exec_inspect_command(
1150 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1151 )
1152
1153 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1154 self.log.debug(
1155 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1156 )
1157
1158 return await self._exec_inspect_command(
1159 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1160 )
1161
1162 async def synchronize_repos(self, cluster_uuid: str):
1163 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1164 try:
1165 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1166 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1167
1168 local_repo_list = await self.repo_list(cluster_uuid)
1169 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1170
1171 deleted_repo_list = []
1172 added_repo_dict = {}
1173
1174 # iterate over the list of repos in the database that should be
1175 # added if not present
1176 for repo_name, db_repo in db_repo_dict.items():
1177 try:
1178 # check if it is already present
1179 curr_repo_url = local_repo_dict.get(db_repo["name"])
1180 repo_id = db_repo.get("_id")
1181 if curr_repo_url != db_repo["url"]:
1182 if curr_repo_url:
1183 self.log.debug(
1184 "repo {} url changed, delete and and again".format(
1185 db_repo["url"]
1186 )
1187 )
1188 await self.repo_remove(cluster_uuid, db_repo["name"])
1189 deleted_repo_list.append(repo_id)
1190
1191 # add repo
1192 self.log.debug("add repo {}".format(db_repo["name"]))
1193 if "ca_cert" in db_repo:
1194 await self.repo_add(
1195 cluster_uuid,
1196 db_repo["name"],
1197 db_repo["url"],
1198 cert=db_repo["ca_cert"],
1199 )
1200 else:
1201 await self.repo_add(
1202 cluster_uuid,
1203 db_repo["name"],
1204 db_repo["url"],
1205 )
1206 added_repo_dict[repo_id] = db_repo["name"]
1207 except Exception as e:
1208 raise K8sException(
1209 "Error adding repo id: {}, err_msg: {} ".format(
1210 repo_id, repr(e)
1211 )
1212 )
1213
1214 # Delete repos that are present but not in nbi_list
1215 for repo_name in local_repo_dict:
1216 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1217 self.log.debug("delete repo {}".format(repo_name))
1218 try:
1219 await self.repo_remove(cluster_uuid, repo_name)
1220 deleted_repo_list.append(repo_name)
1221 except Exception as e:
1222 self.warning(
1223 "Error deleting repo, name: {}, err_msg: {}".format(
1224 repo_name, str(e)
1225 )
1226 )
1227
1228 return deleted_repo_list, added_repo_dict
1229
1230 except K8sException:
1231 raise
1232 except Exception as e:
1233 # Do not raise errors synchronizing repos
1234 self.log.error("Error synchronizing repos: {}".format(e))
1235 raise Exception("Error synchronizing repos: {}".format(e))
1236
1237 def _get_db_repos_dict(self, repo_ids: list):
1238 db_repos_dict = {}
1239 for repo_id in repo_ids:
1240 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1241 db_repos_dict[db_repo["name"]] = db_repo
1242 return db_repos_dict
1243
1244 """
1245 ####################################################################################
1246 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1247 ####################################################################################
1248 """
1249
1250 @abc.abstractmethod
1251 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1252 """
1253 Creates and returns base cluster and kube dirs and returns them.
1254 Also created helm3 dirs according to new directory specification, paths are
1255 not returned but assigned to helm environment variables
1256
1257 :param cluster_name: cluster_name
1258 :return: Dictionary with config_paths and dictionary with helm environment variables
1259 """
1260
1261 @abc.abstractmethod
1262 async def _cluster_init(self, cluster_id, namespace, paths, env):
1263 """
1264 Implements the helm version dependent cluster initialization
1265 """
1266
1267 @abc.abstractmethod
1268 async def _instances_list(self, cluster_id):
1269 """
1270 Implements the helm version dependent helm instances list
1271 """
1272
1273 @abc.abstractmethod
1274 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1275 """
1276 Implements the helm version dependent method to obtain services from a helm instance
1277 """
1278
1279 @abc.abstractmethod
1280 async def _status_kdu(
1281 self,
1282 cluster_id: str,
1283 kdu_instance: str,
1284 namespace: str = None,
1285 yaml_format: bool = False,
1286 show_error_log: bool = False,
1287 ) -> Union[str, dict]:
1288 """
1289 Implements the helm version dependent method to obtain status of a helm instance
1290 """
1291
1292 @abc.abstractmethod
1293 def _get_install_command(
1294 self,
1295 kdu_model,
1296 kdu_instance,
1297 namespace,
1298 params_str,
1299 version,
1300 atomic,
1301 timeout,
1302 kubeconfig,
1303 ) -> str:
1304 """
1305 Obtain command to be executed to delete the indicated instance
1306 """
1307
1308 @abc.abstractmethod
1309 def _get_upgrade_scale_command(
1310 self,
1311 kdu_model,
1312 kdu_instance,
1313 namespace,
1314 count,
1315 version,
1316 atomic,
1317 replicas,
1318 timeout,
1319 resource_name,
1320 kubeconfig,
1321 ) -> str:
1322 """Generates the command to scale a Helm Chart release
1323
1324 Args:
1325 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1326 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1327 namespace (str): Namespace where this KDU instance is deployed
1328 scale (int): Scale count
1329 version (str): Constraint with specific version of the Chart to use
1330 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1331 The --wait flag will be set automatically if --atomic is used
1332 replica_str (str): The key under resource_name key where the scale count is stored
1333 timeout (float): The time, in seconds, to wait
1334 resource_name (str): The KDU's resource to scale
1335 kubeconfig (str): Kubeconfig file path
1336
1337 Returns:
1338 str: command to scale a Helm Chart release
1339 """
1340
1341 @abc.abstractmethod
1342 def _get_upgrade_command(
1343 self,
1344 kdu_model,
1345 kdu_instance,
1346 namespace,
1347 params_str,
1348 version,
1349 atomic,
1350 timeout,
1351 kubeconfig,
1352 ) -> str:
1353 """Generates the command to upgrade a Helm Chart release
1354
1355 Args:
1356 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1357 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1358 namespace (str): Namespace where this KDU instance is deployed
1359 params_str (str): Params used to upgrade the Helm Chart release
1360 version (str): Constraint with specific version of the Chart to use
1361 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1362 The --wait flag will be set automatically if --atomic is used
1363 timeout (float): The time, in seconds, to wait
1364 kubeconfig (str): Kubeconfig file path
1365
1366 Returns:
1367 str: command to upgrade a Helm Chart release
1368 """
1369
1370 @abc.abstractmethod
1371 def _get_rollback_command(
1372 self, kdu_instance, namespace, revision, kubeconfig
1373 ) -> str:
1374 """
1375 Obtain command to be executed to rollback the indicated instance
1376 """
1377
1378 @abc.abstractmethod
1379 def _get_uninstall_command(
1380 self, kdu_instance: str, namespace: str, kubeconfig: str
1381 ) -> str:
1382 """
1383 Obtain command to be executed to delete the indicated instance
1384 """
1385
1386 @abc.abstractmethod
1387 def _get_inspect_command(
1388 self, show_command: str, kdu_model: str, repo_str: str, version: str
1389 ):
1390 """Generates the command to obtain the information about an Helm Chart package
1391 (´helm show ...´ command)
1392
1393 Args:
1394 show_command: the second part of the command (`helm show <show_command>`)
1395 kdu_model: The name or path of an Helm Chart
1396 repo_url: Helm Chart repository url
1397 version: constraint with specific version of the Chart to use
1398
1399 Returns:
1400 str: the generated Helm Chart command
1401 """
1402
1403 @abc.abstractmethod
1404 def _get_get_command(
1405 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1406 ):
1407 """Obtain command to be executed to get information about the kdu instance."""
1408
1409 @abc.abstractmethod
1410 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1411 """
1412 Method call to uninstall cluster software for helm. This method is dependent
1413 of helm version
1414 For Helm v2 it will be called when Tiller must be uninstalled
1415 For Helm v3 it does nothing and does not need to be callled
1416 """
1417
1418 @abc.abstractmethod
1419 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1420 """
1421 Obtains the cluster repos identifiers
1422 """
1423
1424 """
1425 ####################################################################################
1426 ################################### P R I V A T E ##################################
1427 ####################################################################################
1428 """
1429
1430 @staticmethod
1431 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1432 if os.path.exists(filename):
1433 return True
1434 else:
1435 msg = "File {} does not exist".format(filename)
1436 if exception_if_not_exists:
1437 raise K8sException(msg)
1438
1439 @staticmethod
1440 def _remove_multiple_spaces(strobj):
1441 strobj = strobj.strip()
1442 while " " in strobj:
1443 strobj = strobj.replace(" ", " ")
1444 return strobj
1445
1446 @staticmethod
1447 def _output_to_lines(output: str) -> list:
1448 output_lines = list()
1449 lines = output.splitlines(keepends=False)
1450 for line in lines:
1451 line = line.strip()
1452 if len(line) > 0:
1453 output_lines.append(line)
1454 return output_lines
1455
1456 @staticmethod
1457 def _output_to_table(output: str) -> list:
1458 output_table = list()
1459 lines = output.splitlines(keepends=False)
1460 for line in lines:
1461 line = line.replace("\t", " ")
1462 line_list = list()
1463 output_table.append(line_list)
1464 cells = line.split(sep=" ")
1465 for cell in cells:
1466 cell = cell.strip()
1467 if len(cell) > 0:
1468 line_list.append(cell)
1469 return output_table
1470
1471 @staticmethod
1472 def _parse_services(output: str) -> list:
1473 lines = output.splitlines(keepends=False)
1474 services = []
1475 for line in lines:
1476 line = line.replace("\t", " ")
1477 cells = line.split(sep=" ")
1478 if len(cells) > 0 and cells[0].startswith("service/"):
1479 elems = cells[0].split(sep="/")
1480 if len(elems) > 1:
1481 services.append(elems[1])
1482 return services
1483
1484 @staticmethod
1485 def _get_deep(dictionary: dict, members: tuple):
1486 target = dictionary
1487 value = None
1488 try:
1489 for m in members:
1490 value = target.get(m)
1491 if not value:
1492 return None
1493 else:
1494 target = value
1495 except Exception:
1496 pass
1497 return value
1498
1499 # find key:value in several lines
1500 @staticmethod
1501 def _find_in_lines(p_lines: list, p_key: str) -> str:
1502 for line in p_lines:
1503 try:
1504 if line.startswith(p_key + ":"):
1505 parts = line.split(":")
1506 the_value = parts[1].strip()
1507 return the_value
1508 except Exception:
1509 # ignore it
1510 pass
1511 return None
1512
1513 @staticmethod
1514 def _lower_keys_list(input_list: list):
1515 """
1516 Transform the keys in a list of dictionaries to lower case and returns a new list
1517 of dictionaries
1518 """
1519 new_list = []
1520 if input_list:
1521 for dictionary in input_list:
1522 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1523 new_list.append(new_dict)
1524 return new_list
1525
1526 async def _local_async_exec(
1527 self,
1528 command: str,
1529 raise_exception_on_error: bool = False,
1530 show_error_log: bool = True,
1531 encode_utf8: bool = False,
1532 env: dict = None,
1533 ) -> (str, int):
1534 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1535 self.log.debug(
1536 "Executing async local command: {}, env: {}".format(command, env)
1537 )
1538
1539 # split command
1540 command = shlex.split(command)
1541
1542 environ = os.environ.copy()
1543 if env:
1544 environ.update(env)
1545
1546 try:
1547 async with self.cmd_lock:
1548 process = await asyncio.create_subprocess_exec(
1549 *command,
1550 stdout=asyncio.subprocess.PIPE,
1551 stderr=asyncio.subprocess.PIPE,
1552 env=environ,
1553 )
1554
1555 # wait for command terminate
1556 stdout, stderr = await process.communicate()
1557
1558 return_code = process.returncode
1559
1560 output = ""
1561 if stdout:
1562 output = stdout.decode("utf-8").strip()
1563 # output = stdout.decode()
1564 if stderr:
1565 output = stderr.decode("utf-8").strip()
1566 # output = stderr.decode()
1567
1568 if return_code != 0 and show_error_log:
1569 self.log.debug(
1570 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1571 )
1572 else:
1573 self.log.debug("Return code: {}".format(return_code))
1574
1575 if raise_exception_on_error and return_code != 0:
1576 raise K8sException(output)
1577
1578 if encode_utf8:
1579 output = output.encode("utf-8").strip()
1580 output = str(output).replace("\\n", "\n")
1581
1582 return output, return_code
1583
1584 except asyncio.CancelledError:
1585 raise
1586 except K8sException:
1587 raise
1588 except Exception as e:
1589 msg = "Exception executing command: {} -> {}".format(command, e)
1590 self.log.error(msg)
1591 if raise_exception_on_error:
1592 raise K8sException(e) from e
1593 else:
1594 return "", -1
1595
1596 async def _local_async_exec_pipe(
1597 self,
1598 command1: str,
1599 command2: str,
1600 raise_exception_on_error: bool = True,
1601 show_error_log: bool = True,
1602 encode_utf8: bool = False,
1603 env: dict = None,
1604 ):
1605 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1606 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1607 command = "{} | {}".format(command1, command2)
1608 self.log.debug(
1609 "Executing async local command: {}, env: {}".format(command, env)
1610 )
1611
1612 # split command
1613 command1 = shlex.split(command1)
1614 command2 = shlex.split(command2)
1615
1616 environ = os.environ.copy()
1617 if env:
1618 environ.update(env)
1619
1620 try:
1621 async with self.cmd_lock:
1622 read, write = os.pipe()
1623 await asyncio.create_subprocess_exec(
1624 *command1, stdout=write, env=environ
1625 )
1626 os.close(write)
1627 process_2 = await asyncio.create_subprocess_exec(
1628 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1629 )
1630 os.close(read)
1631 stdout, stderr = await process_2.communicate()
1632
1633 return_code = process_2.returncode
1634
1635 output = ""
1636 if stdout:
1637 output = stdout.decode("utf-8").strip()
1638 # output = stdout.decode()
1639 if stderr:
1640 output = stderr.decode("utf-8").strip()
1641 # output = stderr.decode()
1642
1643 if return_code != 0 and show_error_log:
1644 self.log.debug(
1645 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1646 )
1647 else:
1648 self.log.debug("Return code: {}".format(return_code))
1649
1650 if raise_exception_on_error and return_code != 0:
1651 raise K8sException(output)
1652
1653 if encode_utf8:
1654 output = output.encode("utf-8").strip()
1655 output = str(output).replace("\\n", "\n")
1656
1657 return output, return_code
1658 except asyncio.CancelledError:
1659 raise
1660 except K8sException:
1661 raise
1662 except Exception as e:
1663 msg = "Exception executing command: {} -> {}".format(command, e)
1664 self.log.error(msg)
1665 if raise_exception_on_error:
1666 raise K8sException(e) from e
1667 else:
1668 return "", -1
1669
1670 async def _get_service(self, cluster_id, service_name, namespace):
1671 """
1672 Obtains the data of the specified service in the k8cluster.
1673
1674 :param cluster_id: id of a K8s cluster known by OSM
1675 :param service_name: name of the K8s service in the specified namespace
1676 :param namespace: K8s namespace used by the KDU instance
1677 :return: If successful, it will return a service with the following data:
1678 - `name` of the service
1679 - `type` type of service in the k8 cluster
1680 - `ports` List of ports offered by the service, for each port includes at least
1681 name, port, protocol
1682 - `cluster_ip` Internal ip to be used inside k8s cluster
1683 - `external_ip` List of external ips (in case they are available)
1684 """
1685
1686 # init config, env
1687 paths, env = self._init_paths_env(
1688 cluster_name=cluster_id, create_if_not_exist=True
1689 )
1690
1691 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1692 self.kubectl_command,
1693 paths["kube_config"],
1694 quote(namespace),
1695 quote(service_name),
1696 )
1697
1698 output, _rc = await self._local_async_exec(
1699 command=command, raise_exception_on_error=True, env=env
1700 )
1701
1702 data = yaml.load(output, Loader=yaml.SafeLoader)
1703
1704 service = {
1705 "name": service_name,
1706 "type": self._get_deep(data, ("spec", "type")),
1707 "ports": self._get_deep(data, ("spec", "ports")),
1708 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1709 }
1710 if service["type"] == "LoadBalancer":
1711 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1712 ip_list = [elem["ip"] for elem in ip_map_list]
1713 service["external_ip"] = ip_list
1714
1715 return service
1716
1717 async def _exec_get_command(
1718 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1719 ):
1720 """Obtains information about the kdu instance."""
1721
1722 full_command = self._get_get_command(
1723 get_command, kdu_instance, namespace, kubeconfig
1724 )
1725
1726 output, _rc = await self._local_async_exec(command=full_command)
1727
1728 return output
1729
1730 async def _exec_inspect_command(
1731 self, inspect_command: str, kdu_model: str, repo_url: str = None
1732 ):
1733 """Obtains information about an Helm Chart package (´helm show´ command)
1734
1735 Args:
1736 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1737 kdu_model: The name or path of an Helm Chart
1738 repo_url: Helm Chart repository url
1739
1740 Returns:
1741 str: the requested info about the Helm Chart package
1742 """
1743
1744 repo_str = ""
1745 if repo_url:
1746 repo_str = " --repo {}".format(quote(repo_url))
1747
1748 # Obtain the Chart's name and store it in the var kdu_model
1749 kdu_model, _ = self._split_repo(kdu_model=kdu_model)
1750
1751 kdu_model, version = self._split_version(kdu_model)
1752 if version:
1753 version_str = "--version {}".format(quote(version))
1754 else:
1755 version_str = ""
1756
1757 full_command = self._get_inspect_command(
1758 show_command=inspect_command,
1759 kdu_model=quote(kdu_model),
1760 repo_str=repo_str,
1761 version=version_str,
1762 )
1763
1764 output, _ = await self._local_async_exec(command=full_command)
1765
1766 return output
1767
1768 async def _get_replica_count_url(
1769 self,
1770 kdu_model: str,
1771 repo_url: str = None,
1772 resource_name: str = None,
1773 ) -> (int, str):
1774 """Get the replica count value in the Helm Chart Values.
1775
1776 Args:
1777 kdu_model: The name or path of an Helm Chart
1778 repo_url: Helm Chart repository url
1779 resource_name: Resource name
1780
1781 Returns:
1782 A tuple with:
1783 - The number of replicas of the specific instance; if not found, returns None; and
1784 - The string corresponding to the replica count key in the Helm values
1785 """
1786
1787 kdu_values = yaml.load(
1788 await self.values_kdu(kdu_model=kdu_model, repo_url=repo_url),
1789 Loader=yaml.SafeLoader,
1790 )
1791
1792 self.log.debug(f"Obtained the Helm package values for the KDU: {kdu_values}")
1793
1794 if not kdu_values:
1795 raise K8sException(
1796 "kdu_values not found for kdu_model {}".format(kdu_model)
1797 )
1798
1799 if resource_name:
1800 kdu_values = kdu_values.get(resource_name, None)
1801
1802 if not kdu_values:
1803 msg = "resource {} not found in the values in model {}".format(
1804 resource_name, kdu_model
1805 )
1806 self.log.error(msg)
1807 raise K8sException(msg)
1808
1809 duplicate_check = False
1810
1811 replica_str = ""
1812 replicas = None
1813
1814 if kdu_values.get("replicaCount") is not None:
1815 replicas = kdu_values["replicaCount"]
1816 replica_str = "replicaCount"
1817 elif kdu_values.get("replicas") is not None:
1818 duplicate_check = True
1819 replicas = kdu_values["replicas"]
1820 replica_str = "replicas"
1821 else:
1822 if resource_name:
1823 msg = (
1824 "replicaCount or replicas not found in the resource"
1825 "{} values in model {}. Cannot be scaled".format(
1826 resource_name, kdu_model
1827 )
1828 )
1829 else:
1830 msg = (
1831 "replicaCount or replicas not found in the values"
1832 "in model {}. Cannot be scaled".format(kdu_model)
1833 )
1834 self.log.error(msg)
1835 raise K8sException(msg)
1836
1837 # Control if replicas and replicaCount exists at the same time
1838 msg = "replicaCount and replicas are exists at the same time"
1839 if duplicate_check:
1840 if "replicaCount" in kdu_values:
1841 self.log.error(msg)
1842 raise K8sException(msg)
1843 else:
1844 if "replicas" in kdu_values:
1845 self.log.error(msg)
1846 raise K8sException(msg)
1847
1848 return replicas, replica_str
1849
1850 async def _get_replica_count_instance(
1851 self,
1852 kdu_instance: str,
1853 namespace: str,
1854 kubeconfig: str,
1855 resource_name: str = None,
1856 ) -> int:
1857 """Get the replica count value in the instance.
1858
1859 Args:
1860 kdu_instance: The name of the KDU instance
1861 namespace: KDU instance namespace
1862 kubeconfig:
1863 resource_name: Resource name
1864
1865 Returns:
1866 The number of replicas of the specific instance; if not found, returns None
1867 """
1868
1869 kdu_values = yaml.load(
1870 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1871 Loader=yaml.SafeLoader,
1872 )
1873
1874 self.log.debug(f"Obtained the Helm values for the KDU instance: {kdu_values}")
1875
1876 replicas = None
1877
1878 if kdu_values:
1879 resource_values = (
1880 kdu_values.get(resource_name, None) if resource_name else None
1881 )
1882
1883 for replica_str in ("replicaCount", "replicas"):
1884 if resource_values:
1885 replicas = resource_values.get(replica_str)
1886 else:
1887 replicas = kdu_values.get(replica_str)
1888
1889 if replicas is not None:
1890 break
1891
1892 return replicas
1893
1894 async def _store_status(
1895 self,
1896 cluster_id: str,
1897 operation: str,
1898 kdu_instance: str,
1899 namespace: str = None,
1900 db_dict: dict = None,
1901 ) -> None:
1902 """
1903 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1904
1905 :param cluster_id (str): the cluster where the KDU instance is deployed
1906 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1907 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1908 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1909 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1910 values for the keys:
1911 - "collection": The Mongo DB collection to write to
1912 - "filter": The query filter to use in the update process
1913 - "path": The dot separated keys which targets the object to be updated
1914 Defaults to None.
1915 """
1916
1917 try:
1918 detailed_status = await self._status_kdu(
1919 cluster_id=cluster_id,
1920 kdu_instance=kdu_instance,
1921 yaml_format=False,
1922 namespace=namespace,
1923 )
1924
1925 status = detailed_status.get("info").get("description")
1926 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1927
1928 # write status to db
1929 result = await self.write_app_status_to_db(
1930 db_dict=db_dict,
1931 status=str(status),
1932 detailed_status=str(detailed_status),
1933 operation=operation,
1934 )
1935
1936 if not result:
1937 self.log.info("Error writing in database. Task exiting...")
1938
1939 except asyncio.CancelledError as e:
1940 self.log.warning(
1941 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1942 )
1943 except Exception as e:
1944 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1945
1946 # params for use in -f file
1947 # returns values file option and filename (in order to delete it at the end)
1948 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1949 if params and len(params) > 0:
1950 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1951
1952 def get_random_number():
1953 r = random.randrange(start=1, stop=99999999)
1954 s = str(r)
1955 while len(s) < 10:
1956 s = "0" + s
1957 return s
1958
1959 params2 = dict()
1960 for key in params:
1961 value = params.get(key)
1962 if "!!yaml" in str(value):
1963 value = yaml.safe_load(value[7:])
1964 params2[key] = value
1965
1966 values_file = get_random_number() + ".yaml"
1967 with open(values_file, "w") as stream:
1968 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1969
1970 return "-f {}".format(values_file), values_file
1971
1972 return "", None
1973
1974 # params for use in --set option
1975 @staticmethod
1976 def _params_to_set_option(params: dict) -> str:
1977 pairs = [
1978 f"{quote(str(key))}={quote(str(value))}"
1979 for key, value in params.items()
1980 if value is not None
1981 ]
1982 if not pairs:
1983 return ""
1984 return "--set " + ",".join(pairs)
1985
1986 @staticmethod
1987 def generate_kdu_instance_name(**kwargs):
1988 chart_name = kwargs["kdu_model"]
1989 # check embeded chart (file or dir)
1990 if chart_name.startswith("/"):
1991 # extract file or directory name
1992 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1993 # check URL
1994 elif "://" in chart_name:
1995 # extract last portion of URL
1996 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1997
1998 name = ""
1999 for c in chart_name:
2000 if c.isalpha() or c.isnumeric():
2001 name += c
2002 else:
2003 name += "-"
2004 if len(name) > 35:
2005 name = name[0:35]
2006
2007 # if does not start with alpha character, prefix 'a'
2008 if not name[0].isalpha():
2009 name = "a" + name
2010
2011 name += "-"
2012
2013 def get_random_number():
2014 r = random.randrange(start=1, stop=99999999)
2015 s = str(r)
2016 s = s.rjust(10, "0")
2017 return s
2018
2019 name = name + get_random_number()
2020 return name.lower()
2021
2022 def _split_version(self, kdu_model: str) -> (str, str):
2023 version = None
2024 if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
2025 parts = kdu_model.split(sep=":")
2026 if len(parts) == 2:
2027 version = str(parts[1])
2028 kdu_model = parts[0]
2029 return kdu_model, version
2030
2031 def _split_repo(self, kdu_model: str) -> (str, str):
2032 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2033
2034 Args:
2035 kdu_model (str): Associated KDU model
2036
2037 Returns:
2038 (str, str): Tuple with the Chart name in index 0, and the repo name
2039 in index 2; if there was a problem finding them, return None
2040 for both
2041 """
2042
2043 chart_name = None
2044 repo_name = None
2045
2046 idx = kdu_model.find("/")
2047 if idx >= 0:
2048 chart_name = kdu_model[idx + 1 :]
2049 repo_name = kdu_model[:idx]
2050
2051 return chart_name, repo_name
2052
2053 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
2054 """Obtain the Helm repository for an Helm Chart
2055
2056 Args:
2057 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2058 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2059
2060 Returns:
2061 str: the repository URL; if Helm Chart is a local one, the function returns None
2062 """
2063
2064 _, repo_name = self._split_repo(kdu_model=kdu_model)
2065
2066 repo_url = None
2067 if repo_name:
2068 # Find repository link
2069 local_repo_list = await self.repo_list(cluster_uuid)
2070 for repo in local_repo_list:
2071 if repo["name"] == repo_name:
2072 repo_url = repo["url"]
2073 break # it is not necessary to continue the loop if the repo link was found...
2074
2075 return repo_url