Fix security bug: Deserialization of Untrusted Data
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self,
156 cluster_uuid: str,
157 name: str,
158 url: str,
159 repo_type: str = "chart",
160 cert: str = None,
161 user: str = None,
162 password: str = None,
163 ):
164 self.log.debug(
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid, repo_type, name, url
167 )
168 )
169
170 # init_env
171 paths, env = self._init_paths_env(
172 cluster_name=cluster_uuid, create_if_not_exist=True
173 )
174
175 # sync local dir
176 self.fs.sync(from_path=cluster_uuid)
177
178 # helm repo add name url
179 command = ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182
183 if cert:
184 temp_cert_file = os.path.join(
185 self.fs.path, "{}/helmcerts/".format(cluster_uuid), "temp.crt"
186 )
187 os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
188 with open(temp_cert_file, "w") as the_cert:
189 the_cert.write(cert)
190 command += " --ca-file {}".format(temp_cert_file)
191
192 if user:
193 command += " --username={}".format(user)
194
195 if password:
196 command += " --password={}".format(password)
197
198 self.log.debug("adding repo: {}".format(command))
199 await self._local_async_exec(
200 command=command, raise_exception_on_error=True, env=env
201 )
202
203 # helm repo update
204 command = "env KUBECONFIG={} {} repo update {}".format(
205 paths["kube_config"], self._helm_command, name
206 )
207 self.log.debug("updating repo: {}".format(command))
208 await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_uuid)
214
215 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
216 self.log.debug(
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid, repo_type, name
219 )
220 )
221
222 # init_env
223 paths, env = self._init_paths_env(
224 cluster_name=cluster_uuid, create_if_not_exist=True
225 )
226
227 # sync local dir
228 self.fs.sync(from_path=cluster_uuid)
229
230 # helm repo update
231 command = "{} repo update {}".format(self._helm_command, name)
232 self.log.debug("updating repo: {}".format(command))
233 await self._local_async_exec(
234 command=command, raise_exception_on_error=False, env=env
235 )
236
237 # sync fs
238 self.fs.reverse_sync(from_path=cluster_uuid)
239
240 async def repo_list(self, cluster_uuid: str) -> list:
241 """
242 Get the list of registered repositories
243
244 :return: list of registered repositories: [ (name, url) .... ]
245 """
246
247 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
248
249 # config filename
250 paths, env = self._init_paths_env(
251 cluster_name=cluster_uuid, create_if_not_exist=True
252 )
253
254 # sync local dir
255 self.fs.sync(from_path=cluster_uuid)
256
257 command = "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths["kube_config"], self._helm_command
259 )
260
261 # Set exception to false because if there are no repos just want an empty list
262 output, _rc = await self._local_async_exec(
263 command=command, raise_exception_on_error=False, env=env
264 )
265
266 # sync fs
267 self.fs.reverse_sync(from_path=cluster_uuid)
268
269 if _rc == 0:
270 if output and len(output) > 0:
271 repos = yaml.load(output, Loader=yaml.SafeLoader)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self._lower_keys_list(repos)
274 else:
275 return []
276 else:
277 return []
278
279 async def repo_remove(self, cluster_uuid: str, name: str):
280 self.log.debug(
281 "remove {} repositories for cluster {}".format(name, cluster_uuid)
282 )
283
284 # init env, paths
285 paths, env = self._init_paths_env(
286 cluster_name=cluster_uuid, create_if_not_exist=True
287 )
288
289 # sync local dir
290 self.fs.sync(from_path=cluster_uuid)
291
292 command = "env KUBECONFIG={} {} repo remove {}".format(
293 paths["kube_config"], self._helm_command, name
294 )
295 await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
297 )
298
299 # sync fs
300 self.fs.reverse_sync(from_path=cluster_uuid)
301
302 async def reset(
303 self,
304 cluster_uuid: str,
305 force: bool = False,
306 uninstall_sw: bool = False,
307 **kwargs,
308 ) -> bool:
309 """Reset a cluster
310
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
312
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
318 """
319 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
320 self.log.debug(
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid, uninstall_sw
323 )
324 )
325
326 # sync local dir
327 self.fs.sync(from_path=cluster_uuid)
328
329 # uninstall releases if needed.
330 if uninstall_sw:
331 releases = await self.instances_list(cluster_uuid=cluster_uuid)
332 if len(releases) > 0:
333 if force:
334 for r in releases:
335 try:
336 kdu_instance = r.get("name")
337 chart = r.get("chart")
338 self.log.debug(
339 "Uninstalling {} -> {}".format(chart, kdu_instance)
340 )
341 await self.uninstall(
342 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
343 )
344 except Exception as e:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
347 # raised an error
348 self.log.warn(
349 "Error uninstalling release {}: {}".format(
350 kdu_instance, e
351 )
352 )
353 else:
354 msg = (
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid)
357 self.log.warn(msg)
358 uninstall_sw = (
359 False # Allow to remove k8s cluster without removing Tiller
360 )
361
362 if uninstall_sw:
363 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
364
365 # delete cluster directory
366 self.log.debug("Removing directory {}".format(cluster_uuid))
367 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
368 # Remove also local directorio if still exist
369 direct = self.fs.path + "/" + cluster_uuid
370 shutil.rmtree(direct, ignore_errors=True)
371
372 return True
373
374 async def _install_impl(
375 self,
376 cluster_id: str,
377 kdu_model: str,
378 paths: dict,
379 env: dict,
380 kdu_instance: str,
381 atomic: bool = True,
382 timeout: float = 300,
383 params: dict = None,
384 db_dict: dict = None,
385 kdu_name: str = None,
386 namespace: str = None,
387 ):
388 # init env, paths
389 paths, env = self._init_paths_env(
390 cluster_name=cluster_id, create_if_not_exist=True
391 )
392
393 # params to str
394 params_str, file_to_delete = self._params_to_file_option(
395 cluster_id=cluster_id, params=params
396 )
397
398 # version
399 kdu_model, version = self._split_version(kdu_model)
400
401 repo = self._split_repo(kdu_model)
402 if repo:
403 self.repo_update(cluster_id, repo)
404
405 command = self._get_install_command(
406 kdu_model,
407 kdu_instance,
408 namespace,
409 params_str,
410 version,
411 atomic,
412 timeout,
413 paths["kube_config"],
414 )
415
416 self.log.debug("installing: {}".format(command))
417
418 if atomic:
419 # exec helm in a task
420 exec_task = asyncio.ensure_future(
421 coro_or_future=self._local_async_exec(
422 command=command, raise_exception_on_error=False, env=env
423 )
424 )
425
426 # write status in another task
427 status_task = asyncio.ensure_future(
428 coro_or_future=self._store_status(
429 cluster_id=cluster_id,
430 kdu_instance=kdu_instance,
431 namespace=namespace,
432 db_dict=db_dict,
433 operation="install",
434 run_once=False,
435 )
436 )
437
438 # wait for execution task
439 await asyncio.wait([exec_task])
440
441 # cancel status task
442 status_task.cancel()
443
444 output, rc = exec_task.result()
445
446 else:
447
448 output, rc = await self._local_async_exec(
449 command=command, raise_exception_on_error=False, env=env
450 )
451
452 # remove temporal values yaml file
453 if file_to_delete:
454 os.remove(file_to_delete)
455
456 # write final status
457 await self._store_status(
458 cluster_id=cluster_id,
459 kdu_instance=kdu_instance,
460 namespace=namespace,
461 db_dict=db_dict,
462 operation="install",
463 run_once=True,
464 check_every=0,
465 )
466
467 if rc != 0:
468 msg = "Error executing command: {}\nOutput: {}".format(command, output)
469 self.log.error(msg)
470 raise K8sException(msg)
471
472 async def upgrade(
473 self,
474 cluster_uuid: str,
475 kdu_instance: str,
476 kdu_model: str = None,
477 atomic: bool = True,
478 timeout: float = 300,
479 params: dict = None,
480 db_dict: dict = None,
481 ):
482 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
483
484 # sync local dir
485 self.fs.sync(from_path=cluster_uuid)
486
487 # look for instance to obtain namespace
488 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
489 if not instance_info:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance))
491
492 # init env, paths
493 paths, env = self._init_paths_env(
494 cluster_name=cluster_uuid, create_if_not_exist=True
495 )
496
497 # sync local dir
498 self.fs.sync(from_path=cluster_uuid)
499
500 # params to str
501 params_str, file_to_delete = self._params_to_file_option(
502 cluster_id=cluster_uuid, params=params
503 )
504
505 # version
506 kdu_model, version = self._split_version(kdu_model)
507
508 repo = self._split_repo(kdu_model)
509 if repo:
510 self.repo_update(cluster_uuid, repo)
511
512 command = self._get_upgrade_command(
513 kdu_model,
514 kdu_instance,
515 instance_info["namespace"],
516 params_str,
517 version,
518 atomic,
519 timeout,
520 paths["kube_config"],
521 )
522
523 self.log.debug("upgrading: {}".format(command))
524
525 if atomic:
526
527 # exec helm in a task
528 exec_task = asyncio.ensure_future(
529 coro_or_future=self._local_async_exec(
530 command=command, raise_exception_on_error=False, env=env
531 )
532 )
533 # write status in another task
534 status_task = asyncio.ensure_future(
535 coro_or_future=self._store_status(
536 cluster_id=cluster_uuid,
537 kdu_instance=kdu_instance,
538 namespace=instance_info["namespace"],
539 db_dict=db_dict,
540 operation="upgrade",
541 run_once=False,
542 )
543 )
544
545 # wait for execution task
546 await asyncio.wait([exec_task])
547
548 # cancel status task
549 status_task.cancel()
550 output, rc = exec_task.result()
551
552 else:
553
554 output, rc = await self._local_async_exec(
555 command=command, raise_exception_on_error=False, env=env
556 )
557
558 # remove temporal values yaml file
559 if file_to_delete:
560 os.remove(file_to_delete)
561
562 # write final status
563 await self._store_status(
564 cluster_id=cluster_uuid,
565 kdu_instance=kdu_instance,
566 namespace=instance_info["namespace"],
567 db_dict=db_dict,
568 operation="upgrade",
569 run_once=True,
570 check_every=0,
571 )
572
573 if rc != 0:
574 msg = "Error executing command: {}\nOutput: {}".format(command, output)
575 self.log.error(msg)
576 raise K8sException(msg)
577
578 # sync fs
579 self.fs.reverse_sync(from_path=cluster_uuid)
580
581 # return new revision number
582 instance = await self.get_instance_info(
583 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
584 )
585 if instance:
586 revision = int(instance.get("revision"))
587 self.log.debug("New revision: {}".format(revision))
588 return revision
589 else:
590 return 0
591
592 async def scale(
593 self,
594 kdu_instance: str,
595 scale: int,
596 resource_name: str,
597 total_timeout: float = 1800,
598 cluster_uuid: str = None,
599 kdu_model: str = None,
600 atomic: bool = True,
601 db_dict: dict = None,
602 **kwargs,
603 ):
604 """Scale a resource in a Helm Chart.
605
606 Args:
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
617
618 Returns:
619 True if successful, False otherwise
620 """
621
622 debug_mgs = "scaling {} in cluster {}".format(kdu_model, cluster_uuid)
623 if resource_name:
624 debug_mgs = "scaling resource {} in model {} (cluster {})".format(
625 resource_name, kdu_model, cluster_uuid
626 )
627
628 self.log.debug(debug_mgs)
629
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
633 if not instance_info:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance))
635
636 # init env, paths
637 paths, env = self._init_paths_env(
638 cluster_name=cluster_uuid, create_if_not_exist=True
639 )
640
641 # version
642 kdu_model, version = self._split_version(kdu_model)
643
644 repo_url = await self._find_repo(kdu_model, cluster_uuid)
645 if not repo_url:
646 raise K8sException(
647 "Repository not found for kdu_model {}".format(kdu_model)
648 )
649
650 _, replica_str = await self._get_replica_count_url(
651 kdu_model, repo_url, resource_name
652 )
653
654 command = self._get_upgrade_scale_command(
655 kdu_model,
656 kdu_instance,
657 instance_info["namespace"],
658 scale,
659 version,
660 atomic,
661 replica_str,
662 total_timeout,
663 resource_name,
664 paths["kube_config"],
665 )
666
667 self.log.debug("scaling: {}".format(command))
668
669 if atomic:
670 # exec helm in a task
671 exec_task = asyncio.ensure_future(
672 coro_or_future=self._local_async_exec(
673 command=command, raise_exception_on_error=False, env=env
674 )
675 )
676 # write status in another task
677 status_task = asyncio.ensure_future(
678 coro_or_future=self._store_status(
679 cluster_id=cluster_uuid,
680 kdu_instance=kdu_instance,
681 namespace=instance_info["namespace"],
682 db_dict=db_dict,
683 operation="scale",
684 run_once=False,
685 )
686 )
687
688 # wait for execution task
689 await asyncio.wait([exec_task])
690
691 # cancel status task
692 status_task.cancel()
693 output, rc = exec_task.result()
694
695 else:
696 output, rc = await self._local_async_exec(
697 command=command, raise_exception_on_error=False, env=env
698 )
699
700 # write final status
701 await self._store_status(
702 cluster_id=cluster_uuid,
703 kdu_instance=kdu_instance,
704 namespace=instance_info["namespace"],
705 db_dict=db_dict,
706 operation="scale",
707 run_once=True,
708 check_every=0,
709 )
710
711 if rc != 0:
712 msg = "Error executing command: {}\nOutput: {}".format(command, output)
713 self.log.error(msg)
714 raise K8sException(msg)
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_uuid)
718
719 return True
720
721 async def get_scale_count(
722 self,
723 resource_name: str,
724 kdu_instance: str,
725 cluster_uuid: str,
726 kdu_model: str,
727 **kwargs,
728 ) -> int:
729 """Get a resource scale count.
730
731 Args:
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of a bundle
736 kwargs: Additional parameters
737
738 Returns:
739 Resource instance count
740 """
741
742 self.log.debug(
743 "getting scale count for {} in cluster {}".format(kdu_model, cluster_uuid)
744 )
745
746 # look for instance to obtain namespace
747 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
748 if not instance_info:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance))
750
751 # init env, paths
752 paths, env = self._init_paths_env(
753 cluster_name=cluster_uuid, create_if_not_exist=True
754 )
755
756 replicas = await self._get_replica_count_instance(
757 kdu_instance, instance_info["namespace"], paths["kube_config"]
758 )
759
760 # Get default value if scale count is not found from provided values
761 if not replicas:
762 repo_url = await self._find_repo(kdu_model, cluster_uuid)
763 if not repo_url:
764 raise K8sException(
765 "Repository not found for kdu_model {}".format(kdu_model)
766 )
767
768 replicas, _ = await self._get_replica_count_url(
769 kdu_model, repo_url, resource_name
770 )
771
772 if not replicas:
773 msg = "Replica count not found. Cannot be scaled"
774 self.log.error(msg)
775 raise K8sException(msg)
776
777 return int(replicas)
778
779 async def rollback(
780 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
781 ):
782 self.log.debug(
783 "rollback kdu_instance {} to revision {} from cluster {}".format(
784 kdu_instance, revision, cluster_uuid
785 )
786 )
787
788 # sync local dir
789 self.fs.sync(from_path=cluster_uuid)
790
791 # look for instance to obtain namespace
792 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
793 if not instance_info:
794 raise K8sException("kdu_instance {} not found".format(kdu_instance))
795
796 # init env, paths
797 paths, env = self._init_paths_env(
798 cluster_name=cluster_uuid, create_if_not_exist=True
799 )
800
801 # sync local dir
802 self.fs.sync(from_path=cluster_uuid)
803
804 command = self._get_rollback_command(
805 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
806 )
807
808 self.log.debug("rolling_back: {}".format(command))
809
810 # exec helm in a task
811 exec_task = asyncio.ensure_future(
812 coro_or_future=self._local_async_exec(
813 command=command, raise_exception_on_error=False, env=env
814 )
815 )
816 # write status in another task
817 status_task = asyncio.ensure_future(
818 coro_or_future=self._store_status(
819 cluster_id=cluster_uuid,
820 kdu_instance=kdu_instance,
821 namespace=instance_info["namespace"],
822 db_dict=db_dict,
823 operation="rollback",
824 run_once=False,
825 )
826 )
827
828 # wait for execution task
829 await asyncio.wait([exec_task])
830
831 # cancel status task
832 status_task.cancel()
833
834 output, rc = exec_task.result()
835
836 # write final status
837 await self._store_status(
838 cluster_id=cluster_uuid,
839 kdu_instance=kdu_instance,
840 namespace=instance_info["namespace"],
841 db_dict=db_dict,
842 operation="rollback",
843 run_once=True,
844 check_every=0,
845 )
846
847 if rc != 0:
848 msg = "Error executing command: {}\nOutput: {}".format(command, output)
849 self.log.error(msg)
850 raise K8sException(msg)
851
852 # sync fs
853 self.fs.reverse_sync(from_path=cluster_uuid)
854
855 # return new revision number
856 instance = await self.get_instance_info(
857 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
858 )
859 if instance:
860 revision = int(instance.get("revision"))
861 self.log.debug("New revision: {}".format(revision))
862 return revision
863 else:
864 return 0
865
866 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
867 """
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
870 are invoked).
871
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
876 """
877
878 self.log.debug(
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance, cluster_uuid
881 )
882 )
883
884 # sync local dir
885 self.fs.sync(from_path=cluster_uuid)
886
887 # look for instance to obtain namespace
888 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
889 if not instance_info:
890 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
891 return True
892 # init env, paths
893 paths, env = self._init_paths_env(
894 cluster_name=cluster_uuid, create_if_not_exist=True
895 )
896
897 # sync local dir
898 self.fs.sync(from_path=cluster_uuid)
899
900 command = self._get_uninstall_command(
901 kdu_instance, instance_info["namespace"], paths["kube_config"]
902 )
903 output, _rc = await self._local_async_exec(
904 command=command, raise_exception_on_error=True, env=env
905 )
906
907 # sync fs
908 self.fs.reverse_sync(from_path=cluster_uuid)
909
910 return self._output_to_table(output)
911
912 async def instances_list(self, cluster_uuid: str) -> list:
913 """
914 returns a list of deployed releases in a cluster
915
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
917 :return:
918 """
919
920 self.log.debug("list releases for cluster {}".format(cluster_uuid))
921
922 # sync local dir
923 self.fs.sync(from_path=cluster_uuid)
924
925 # execute internal command
926 result = await self._instances_list(cluster_uuid)
927
928 # sync fs
929 self.fs.reverse_sync(from_path=cluster_uuid)
930
931 return result
932
933 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
934 instances = await self.instances_list(cluster_uuid=cluster_uuid)
935 for instance in instances:
936 if instance.get("name") == kdu_instance:
937 return instance
938 self.log.debug("Instance {} not found".format(kdu_instance))
939 return None
940
941 async def upgrade_charm(
942 self,
943 ee_id: str = None,
944 path: str = None,
945 charm_id: str = None,
946 charm_type: str = None,
947 timeout: float = None,
948 ) -> str:
949 """This method upgrade charms in VNFs
950
951 Args:
952 ee_id: Execution environment id
953 path: Local path to the charm
954 charm_id: charm-id
955 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
956 timeout: (Float) Timeout for the ns update operation
957
958 Returns:
959 The output of the update operation if status equals to "completed"
960 """
961 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
962
963 async def exec_primitive(
964 self,
965 cluster_uuid: str = None,
966 kdu_instance: str = None,
967 primitive_name: str = None,
968 timeout: float = 300,
969 params: dict = None,
970 db_dict: dict = None,
971 **kwargs,
972 ) -> str:
973 """Exec primitive (Juju action)
974
975 :param cluster_uuid: The UUID of the cluster or namespace:cluster
976 :param kdu_instance: The unique name of the KDU instance
977 :param primitive_name: Name of action that will be executed
978 :param timeout: Timeout for action execution
979 :param params: Dictionary of all the parameters needed for the action
980 :db_dict: Dictionary for any additional data
981 :param kwargs: Additional parameters (None yet)
982
983 :return: Returns the output of the action
984 """
985 raise K8sException(
986 "KDUs deployed with Helm don't support actions "
987 "different from rollback, upgrade and status"
988 )
989
990 async def get_services(
991 self, cluster_uuid: str, kdu_instance: str, namespace: str
992 ) -> list:
993 """
994 Returns a list of services defined for the specified kdu instance.
995
996 :param cluster_uuid: UUID of a K8s cluster known by OSM
997 :param kdu_instance: unique name for the KDU instance
998 :param namespace: K8s namespace used by the KDU instance
999 :return: If successful, it will return a list of services, Each service
1000 can have the following data:
1001 - `name` of the service
1002 - `type` type of service in the k8 cluster
1003 - `ports` List of ports offered by the service, for each port includes at least
1004 name, port, protocol
1005 - `cluster_ip` Internal ip to be used inside k8s cluster
1006 - `external_ip` List of external ips (in case they are available)
1007 """
1008
1009 self.log.debug(
1010 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1011 cluster_uuid, kdu_instance
1012 )
1013 )
1014
1015 # init env, paths
1016 paths, env = self._init_paths_env(
1017 cluster_name=cluster_uuid, create_if_not_exist=True
1018 )
1019
1020 # sync local dir
1021 self.fs.sync(from_path=cluster_uuid)
1022
1023 # get list of services names for kdu
1024 service_names = await self._get_services(
1025 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
1026 )
1027
1028 service_list = []
1029 for service in service_names:
1030 service = await self._get_service(cluster_uuid, service, namespace)
1031 service_list.append(service)
1032
1033 # sync fs
1034 self.fs.reverse_sync(from_path=cluster_uuid)
1035
1036 return service_list
1037
1038 async def get_service(
1039 self, cluster_uuid: str, service_name: str, namespace: str
1040 ) -> object:
1041
1042 self.log.debug(
1043 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1044 service_name, namespace, cluster_uuid
1045 )
1046 )
1047
1048 # sync local dir
1049 self.fs.sync(from_path=cluster_uuid)
1050
1051 service = await self._get_service(cluster_uuid, service_name, namespace)
1052
1053 # sync fs
1054 self.fs.reverse_sync(from_path=cluster_uuid)
1055
1056 return service
1057
1058 async def status_kdu(
1059 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
1060 ) -> Union[str, dict]:
1061 """
1062 This call would retrieve tha current state of a given KDU instance. It would be
1063 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1064 values_ of the configuration parameters applied to a given instance. This call
1065 would be based on the `status` call.
1066
1067 :param cluster_uuid: UUID of a K8s cluster known by OSM
1068 :param kdu_instance: unique name for the KDU instance
1069 :param kwargs: Additional parameters (None yet)
1070 :param yaml_format: if the return shall be returned as an YAML string or as a
1071 dictionary
1072 :return: If successful, it will return the following vector of arguments:
1073 - K8s `namespace` in the cluster where the KDU lives
1074 - `state` of the KDU instance. It can be:
1075 - UNKNOWN
1076 - DEPLOYED
1077 - DELETED
1078 - SUPERSEDED
1079 - FAILED or
1080 - DELETING
1081 - List of `resources` (objects) that this release consists of, sorted by kind,
1082 and the status of those resources
1083 - Last `deployment_time`.
1084
1085 """
1086 self.log.debug(
1087 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1088 cluster_uuid, kdu_instance
1089 )
1090 )
1091
1092 # sync local dir
1093 self.fs.sync(from_path=cluster_uuid)
1094
1095 # get instance: needed to obtain namespace
1096 instances = await self._instances_list(cluster_id=cluster_uuid)
1097 for instance in instances:
1098 if instance.get("name") == kdu_instance:
1099 break
1100 else:
1101 # instance does not exist
1102 raise K8sException(
1103 "Instance name: {} not found in cluster: {}".format(
1104 kdu_instance, cluster_uuid
1105 )
1106 )
1107
1108 status = await self._status_kdu(
1109 cluster_id=cluster_uuid,
1110 kdu_instance=kdu_instance,
1111 namespace=instance["namespace"],
1112 yaml_format=yaml_format,
1113 show_error_log=True,
1114 )
1115
1116 # sync fs
1117 self.fs.reverse_sync(from_path=cluster_uuid)
1118
1119 return status
1120
1121 async def get_values_kdu(
1122 self, kdu_instance: str, namespace: str, kubeconfig: str
1123 ) -> str:
1124
1125 self.log.debug("get kdu_instance values {}".format(kdu_instance))
1126
1127 return await self._exec_get_command(
1128 get_command="values",
1129 kdu_instance=kdu_instance,
1130 namespace=namespace,
1131 kubeconfig=kubeconfig,
1132 )
1133
1134 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1135
1136 self.log.debug(
1137 "inspect kdu_model values {} from (optional) repo: {}".format(
1138 kdu_model, repo_url
1139 )
1140 )
1141
1142 return await self._exec_inspect_command(
1143 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
1144 )
1145
1146 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
1147
1148 self.log.debug(
1149 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
1150 )
1151
1152 return await self._exec_inspect_command(
1153 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
1154 )
1155
1156 async def synchronize_repos(self, cluster_uuid: str):
1157
1158 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
1159 try:
1160 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
1161 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
1162
1163 local_repo_list = await self.repo_list(cluster_uuid)
1164 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
1165
1166 deleted_repo_list = []
1167 added_repo_dict = {}
1168
1169 # iterate over the list of repos in the database that should be
1170 # added if not present
1171 for repo_name, db_repo in db_repo_dict.items():
1172 try:
1173 # check if it is already present
1174 curr_repo_url = local_repo_dict.get(db_repo["name"])
1175 repo_id = db_repo.get("_id")
1176 if curr_repo_url != db_repo["url"]:
1177 if curr_repo_url:
1178 self.log.debug(
1179 "repo {} url changed, delete and and again".format(
1180 db_repo["url"]
1181 )
1182 )
1183 await self.repo_remove(cluster_uuid, db_repo["name"])
1184 deleted_repo_list.append(repo_id)
1185
1186 # add repo
1187 self.log.debug("add repo {}".format(db_repo["name"]))
1188 if "ca_cert" in db_repo:
1189 await self.repo_add(
1190 cluster_uuid,
1191 db_repo["name"],
1192 db_repo["url"],
1193 cert=db_repo["ca_cert"],
1194 )
1195 else:
1196 await self.repo_add(
1197 cluster_uuid,
1198 db_repo["name"],
1199 db_repo["url"],
1200 )
1201 added_repo_dict[repo_id] = db_repo["name"]
1202 except Exception as e:
1203 raise K8sException(
1204 "Error adding repo id: {}, err_msg: {} ".format(
1205 repo_id, repr(e)
1206 )
1207 )
1208
1209 # Delete repos that are present but not in nbi_list
1210 for repo_name in local_repo_dict:
1211 if not db_repo_dict.get(repo_name) and repo_name != "stable":
1212 self.log.debug("delete repo {}".format(repo_name))
1213 try:
1214 await self.repo_remove(cluster_uuid, repo_name)
1215 deleted_repo_list.append(repo_name)
1216 except Exception as e:
1217 self.warning(
1218 "Error deleting repo, name: {}, err_msg: {}".format(
1219 repo_name, str(e)
1220 )
1221 )
1222
1223 return deleted_repo_list, added_repo_dict
1224
1225 except K8sException:
1226 raise
1227 except Exception as e:
1228 # Do not raise errors synchronizing repos
1229 self.log.error("Error synchronizing repos: {}".format(e))
1230 raise Exception("Error synchronizing repos: {}".format(e))
1231
1232 def _get_db_repos_dict(self, repo_ids: list):
1233 db_repos_dict = {}
1234 for repo_id in repo_ids:
1235 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1236 db_repos_dict[db_repo["name"]] = db_repo
1237 return db_repos_dict
1238
1239 """
1240 ####################################################################################
1241 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1242 ####################################################################################
1243 """
1244
1245 @abc.abstractmethod
1246 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1247 """
1248 Creates and returns base cluster and kube dirs and returns them.
1249 Also created helm3 dirs according to new directory specification, paths are
1250 not returned but assigned to helm environment variables
1251
1252 :param cluster_name: cluster_name
1253 :return: Dictionary with config_paths and dictionary with helm environment variables
1254 """
1255
1256 @abc.abstractmethod
1257 async def _cluster_init(self, cluster_id, namespace, paths, env):
1258 """
1259 Implements the helm version dependent cluster initialization
1260 """
1261
1262 @abc.abstractmethod
1263 async def _instances_list(self, cluster_id):
1264 """
1265 Implements the helm version dependent helm instances list
1266 """
1267
1268 @abc.abstractmethod
1269 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1270 """
1271 Implements the helm version dependent method to obtain services from a helm instance
1272 """
1273
1274 @abc.abstractmethod
1275 async def _status_kdu(
1276 self,
1277 cluster_id: str,
1278 kdu_instance: str,
1279 namespace: str = None,
1280 yaml_format: bool = False,
1281 show_error_log: bool = False,
1282 ) -> Union[str, dict]:
1283 """
1284 Implements the helm version dependent method to obtain status of a helm instance
1285 """
1286
1287 @abc.abstractmethod
1288 def _get_install_command(
1289 self,
1290 kdu_model,
1291 kdu_instance,
1292 namespace,
1293 params_str,
1294 version,
1295 atomic,
1296 timeout,
1297 kubeconfig,
1298 ) -> str:
1299 """
1300 Obtain command to be executed to delete the indicated instance
1301 """
1302
1303 @abc.abstractmethod
1304 def _get_upgrade_scale_command(
1305 self,
1306 kdu_model,
1307 kdu_instance,
1308 namespace,
1309 count,
1310 version,
1311 atomic,
1312 replicas,
1313 timeout,
1314 resource_name,
1315 kubeconfig,
1316 ) -> str:
1317 """Obtain command to be executed to upgrade the indicated instance."""
1318
1319 @abc.abstractmethod
1320 def _get_upgrade_command(
1321 self,
1322 kdu_model,
1323 kdu_instance,
1324 namespace,
1325 params_str,
1326 version,
1327 atomic,
1328 timeout,
1329 kubeconfig,
1330 ) -> str:
1331 """
1332 Obtain command to be executed to upgrade the indicated instance
1333 """
1334
1335 @abc.abstractmethod
1336 def _get_rollback_command(
1337 self, kdu_instance, namespace, revision, kubeconfig
1338 ) -> str:
1339 """
1340 Obtain command to be executed to rollback the indicated instance
1341 """
1342
1343 @abc.abstractmethod
1344 def _get_uninstall_command(
1345 self, kdu_instance: str, namespace: str, kubeconfig: str
1346 ) -> str:
1347 """
1348 Obtain command to be executed to delete the indicated instance
1349 """
1350
1351 @abc.abstractmethod
1352 def _get_inspect_command(
1353 self, show_command: str, kdu_model: str, repo_str: str, version: str
1354 ):
1355 """
1356 Obtain command to be executed to obtain information about the kdu
1357 """
1358
1359 @abc.abstractmethod
1360 def _get_get_command(
1361 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1362 ):
1363 """Obtain command to be executed to get information about the kdu instance."""
1364
1365 @abc.abstractmethod
1366 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1367 """
1368 Method call to uninstall cluster software for helm. This method is dependent
1369 of helm version
1370 For Helm v2 it will be called when Tiller must be uninstalled
1371 For Helm v3 it does nothing and does not need to be callled
1372 """
1373
1374 @abc.abstractmethod
1375 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1376 """
1377 Obtains the cluster repos identifiers
1378 """
1379
1380 """
1381 ####################################################################################
1382 ################################### P R I V A T E ##################################
1383 ####################################################################################
1384 """
1385
1386 @staticmethod
1387 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1388 if os.path.exists(filename):
1389 return True
1390 else:
1391 msg = "File {} does not exist".format(filename)
1392 if exception_if_not_exists:
1393 raise K8sException(msg)
1394
1395 @staticmethod
1396 def _remove_multiple_spaces(strobj):
1397 strobj = strobj.strip()
1398 while " " in strobj:
1399 strobj = strobj.replace(" ", " ")
1400 return strobj
1401
1402 @staticmethod
1403 def _output_to_lines(output: str) -> list:
1404 output_lines = list()
1405 lines = output.splitlines(keepends=False)
1406 for line in lines:
1407 line = line.strip()
1408 if len(line) > 0:
1409 output_lines.append(line)
1410 return output_lines
1411
1412 @staticmethod
1413 def _output_to_table(output: str) -> list:
1414 output_table = list()
1415 lines = output.splitlines(keepends=False)
1416 for line in lines:
1417 line = line.replace("\t", " ")
1418 line_list = list()
1419 output_table.append(line_list)
1420 cells = line.split(sep=" ")
1421 for cell in cells:
1422 cell = cell.strip()
1423 if len(cell) > 0:
1424 line_list.append(cell)
1425 return output_table
1426
1427 @staticmethod
1428 def _parse_services(output: str) -> list:
1429 lines = output.splitlines(keepends=False)
1430 services = []
1431 for line in lines:
1432 line = line.replace("\t", " ")
1433 cells = line.split(sep=" ")
1434 if len(cells) > 0 and cells[0].startswith("service/"):
1435 elems = cells[0].split(sep="/")
1436 if len(elems) > 1:
1437 services.append(elems[1])
1438 return services
1439
1440 @staticmethod
1441 def _get_deep(dictionary: dict, members: tuple):
1442 target = dictionary
1443 value = None
1444 try:
1445 for m in members:
1446 value = target.get(m)
1447 if not value:
1448 return None
1449 else:
1450 target = value
1451 except Exception:
1452 pass
1453 return value
1454
1455 # find key:value in several lines
1456 @staticmethod
1457 def _find_in_lines(p_lines: list, p_key: str) -> str:
1458 for line in p_lines:
1459 try:
1460 if line.startswith(p_key + ":"):
1461 parts = line.split(":")
1462 the_value = parts[1].strip()
1463 return the_value
1464 except Exception:
1465 # ignore it
1466 pass
1467 return None
1468
1469 @staticmethod
1470 def _lower_keys_list(input_list: list):
1471 """
1472 Transform the keys in a list of dictionaries to lower case and returns a new list
1473 of dictionaries
1474 """
1475 new_list = []
1476 if input_list:
1477 for dictionary in input_list:
1478 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1479 new_list.append(new_dict)
1480 return new_list
1481
1482 async def _local_async_exec(
1483 self,
1484 command: str,
1485 raise_exception_on_error: bool = False,
1486 show_error_log: bool = True,
1487 encode_utf8: bool = False,
1488 env: dict = None,
1489 ) -> (str, int):
1490
1491 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1492 self.log.debug(
1493 "Executing async local command: {}, env: {}".format(command, env)
1494 )
1495
1496 # split command
1497 command = shlex.split(command)
1498
1499 environ = os.environ.copy()
1500 if env:
1501 environ.update(env)
1502
1503 try:
1504 process = await asyncio.create_subprocess_exec(
1505 *command,
1506 stdout=asyncio.subprocess.PIPE,
1507 stderr=asyncio.subprocess.PIPE,
1508 env=environ,
1509 )
1510
1511 # wait for command terminate
1512 stdout, stderr = await process.communicate()
1513
1514 return_code = process.returncode
1515
1516 output = ""
1517 if stdout:
1518 output = stdout.decode("utf-8").strip()
1519 # output = stdout.decode()
1520 if stderr:
1521 output = stderr.decode("utf-8").strip()
1522 # output = stderr.decode()
1523
1524 if return_code != 0 and show_error_log:
1525 self.log.debug(
1526 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1527 )
1528 else:
1529 self.log.debug("Return code: {}".format(return_code))
1530
1531 if raise_exception_on_error and return_code != 0:
1532 raise K8sException(output)
1533
1534 if encode_utf8:
1535 output = output.encode("utf-8").strip()
1536 output = str(output).replace("\\n", "\n")
1537
1538 return output, return_code
1539
1540 except asyncio.CancelledError:
1541 raise
1542 except K8sException:
1543 raise
1544 except Exception as e:
1545 msg = "Exception executing command: {} -> {}".format(command, e)
1546 self.log.error(msg)
1547 if raise_exception_on_error:
1548 raise K8sException(e) from e
1549 else:
1550 return "", -1
1551
1552 async def _local_async_exec_pipe(
1553 self,
1554 command1: str,
1555 command2: str,
1556 raise_exception_on_error: bool = True,
1557 show_error_log: bool = True,
1558 encode_utf8: bool = False,
1559 env: dict = None,
1560 ):
1561
1562 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1563 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1564 command = "{} | {}".format(command1, command2)
1565 self.log.debug(
1566 "Executing async local command: {}, env: {}".format(command, env)
1567 )
1568
1569 # split command
1570 command1 = shlex.split(command1)
1571 command2 = shlex.split(command2)
1572
1573 environ = os.environ.copy()
1574 if env:
1575 environ.update(env)
1576
1577 try:
1578 read, write = os.pipe()
1579 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1580 os.close(write)
1581 process_2 = await asyncio.create_subprocess_exec(
1582 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1583 )
1584 os.close(read)
1585 stdout, stderr = await process_2.communicate()
1586
1587 return_code = process_2.returncode
1588
1589 output = ""
1590 if stdout:
1591 output = stdout.decode("utf-8").strip()
1592 # output = stdout.decode()
1593 if stderr:
1594 output = stderr.decode("utf-8").strip()
1595 # output = stderr.decode()
1596
1597 if return_code != 0 and show_error_log:
1598 self.log.debug(
1599 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1600 )
1601 else:
1602 self.log.debug("Return code: {}".format(return_code))
1603
1604 if raise_exception_on_error and return_code != 0:
1605 raise K8sException(output)
1606
1607 if encode_utf8:
1608 output = output.encode("utf-8").strip()
1609 output = str(output).replace("\\n", "\n")
1610
1611 return output, return_code
1612 except asyncio.CancelledError:
1613 raise
1614 except K8sException:
1615 raise
1616 except Exception as e:
1617 msg = "Exception executing command: {} -> {}".format(command, e)
1618 self.log.error(msg)
1619 if raise_exception_on_error:
1620 raise K8sException(e) from e
1621 else:
1622 return "", -1
1623
1624 async def _get_service(self, cluster_id, service_name, namespace):
1625 """
1626 Obtains the data of the specified service in the k8cluster.
1627
1628 :param cluster_id: id of a K8s cluster known by OSM
1629 :param service_name: name of the K8s service in the specified namespace
1630 :param namespace: K8s namespace used by the KDU instance
1631 :return: If successful, it will return a service with the following data:
1632 - `name` of the service
1633 - `type` type of service in the k8 cluster
1634 - `ports` List of ports offered by the service, for each port includes at least
1635 name, port, protocol
1636 - `cluster_ip` Internal ip to be used inside k8s cluster
1637 - `external_ip` List of external ips (in case they are available)
1638 """
1639
1640 # init config, env
1641 paths, env = self._init_paths_env(
1642 cluster_name=cluster_id, create_if_not_exist=True
1643 )
1644
1645 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1646 self.kubectl_command, paths["kube_config"], namespace, service_name
1647 )
1648
1649 output, _rc = await self._local_async_exec(
1650 command=command, raise_exception_on_error=True, env=env
1651 )
1652
1653 data = yaml.load(output, Loader=yaml.SafeLoader)
1654
1655 service = {
1656 "name": service_name,
1657 "type": self._get_deep(data, ("spec", "type")),
1658 "ports": self._get_deep(data, ("spec", "ports")),
1659 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1660 }
1661 if service["type"] == "LoadBalancer":
1662 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1663 ip_list = [elem["ip"] for elem in ip_map_list]
1664 service["external_ip"] = ip_list
1665
1666 return service
1667
1668 async def _exec_get_command(
1669 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
1670 ):
1671 """Obtains information about the kdu instance."""
1672
1673 full_command = self._get_get_command(
1674 get_command, kdu_instance, namespace, kubeconfig
1675 )
1676
1677 output, _rc = await self._local_async_exec(command=full_command)
1678
1679 return output
1680
1681 async def _exec_inspect_command(
1682 self, inspect_command: str, kdu_model: str, repo_url: str = None
1683 ):
1684 """Obtains information about a kdu, no cluster (no env)."""
1685
1686 repo_str = ""
1687 if repo_url:
1688 repo_str = " --repo {}".format(repo_url)
1689
1690 idx = kdu_model.find("/")
1691 if idx >= 0:
1692 idx += 1
1693 kdu_model = kdu_model[idx:]
1694
1695 kdu_model, version = self._split_version(kdu_model)
1696 if version:
1697 version_str = "--version {}".format(version)
1698 else:
1699 version_str = ""
1700
1701 full_command = self._get_inspect_command(
1702 inspect_command, kdu_model, repo_str, version_str
1703 )
1704
1705 output, _rc = await self._local_async_exec(command=full_command)
1706
1707 return output
1708
1709 async def _get_replica_count_url(
1710 self,
1711 kdu_model: str,
1712 repo_url: str,
1713 resource_name: str = None,
1714 ):
1715 """Get the replica count value in the Helm Chart Values.
1716
1717 Args:
1718 kdu_model: The name or path of a bundle
1719 repo_url: Helm Chart repository url
1720 resource_name: Resource name
1721
1722 Returns:
1723 True if replicas, False replicaCount
1724 """
1725
1726 kdu_values = yaml.load(
1727 await self.values_kdu(kdu_model, repo_url), Loader=yaml.SafeLoader
1728 )
1729
1730 if not kdu_values:
1731 raise K8sException(
1732 "kdu_values not found for kdu_model {}".format(kdu_model)
1733 )
1734
1735 if resource_name:
1736 kdu_values = kdu_values.get(resource_name, None)
1737
1738 if not kdu_values:
1739 msg = "resource {} not found in the values in model {}".format(
1740 resource_name, kdu_model
1741 )
1742 self.log.error(msg)
1743 raise K8sException(msg)
1744
1745 duplicate_check = False
1746
1747 replica_str = ""
1748 replicas = None
1749
1750 if kdu_values.get("replicaCount", None):
1751 replicas = kdu_values["replicaCount"]
1752 replica_str = "replicaCount"
1753 elif kdu_values.get("replicas", None):
1754 duplicate_check = True
1755 replicas = kdu_values["replicas"]
1756 replica_str = "replicas"
1757 else:
1758 if resource_name:
1759 msg = (
1760 "replicaCount or replicas not found in the resource"
1761 "{} values in model {}. Cannot be scaled".format(
1762 resource_name, kdu_model
1763 )
1764 )
1765 else:
1766 msg = (
1767 "replicaCount or replicas not found in the values"
1768 "in model {}. Cannot be scaled".format(kdu_model)
1769 )
1770 self.log.error(msg)
1771 raise K8sException(msg)
1772
1773 # Control if replicas and replicaCount exists at the same time
1774 msg = "replicaCount and replicas are exists at the same time"
1775 if duplicate_check:
1776 if "replicaCount" in kdu_values:
1777 self.log.error(msg)
1778 raise K8sException(msg)
1779 else:
1780 if "replicas" in kdu_values:
1781 self.log.error(msg)
1782 raise K8sException(msg)
1783
1784 return replicas, replica_str
1785
1786 async def _get_replica_count_instance(
1787 self,
1788 kdu_instance: str,
1789 namespace: str,
1790 kubeconfig: str,
1791 resource_name: str = None,
1792 ):
1793 """Get the replica count value in the instance.
1794
1795 Args:
1796 kdu_instance: The name of the KDU instance
1797 namespace: KDU instance namespace
1798 kubeconfig:
1799 resource_name: Resource name
1800
1801 Returns:
1802 True if replicas, False replicaCount
1803 """
1804
1805 kdu_values = yaml.load(
1806 await self.get_values_kdu(kdu_instance, namespace, kubeconfig),
1807 Loader=yaml.SafeLoader,
1808 )
1809
1810 replicas = None
1811
1812 if kdu_values:
1813 resource_values = (
1814 kdu_values.get(resource_name, None) if resource_name else None
1815 )
1816 replicas = (
1817 (
1818 resource_values.get("replicaCount", None)
1819 or resource_values.get("replicas", None)
1820 )
1821 if resource_values
1822 else (
1823 kdu_values.get("replicaCount", None)
1824 or kdu_values.get("replicas", None)
1825 )
1826 )
1827
1828 return replicas
1829
1830 async def _store_status(
1831 self,
1832 cluster_id: str,
1833 operation: str,
1834 kdu_instance: str,
1835 namespace: str = None,
1836 check_every: float = 10,
1837 db_dict: dict = None,
1838 run_once: bool = False,
1839 ):
1840 while True:
1841 try:
1842 await asyncio.sleep(check_every)
1843 detailed_status = await self._status_kdu(
1844 cluster_id=cluster_id,
1845 kdu_instance=kdu_instance,
1846 yaml_format=False,
1847 namespace=namespace,
1848 )
1849 status = detailed_status.get("info").get("description")
1850 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1851 # write status to db
1852 result = await self.write_app_status_to_db(
1853 db_dict=db_dict,
1854 status=str(status),
1855 detailed_status=str(detailed_status),
1856 operation=operation,
1857 )
1858 if not result:
1859 self.log.info("Error writing in database. Task exiting...")
1860 return
1861 except asyncio.CancelledError:
1862 self.log.debug("Task cancelled")
1863 return
1864 except Exception as e:
1865 self.log.debug(
1866 "_store_status exception: {}".format(str(e)), exc_info=True
1867 )
1868 pass
1869 finally:
1870 if run_once:
1871 return
1872
1873 # params for use in -f file
1874 # returns values file option and filename (in order to delete it at the end)
1875 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1876
1877 if params and len(params) > 0:
1878 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1879
1880 def get_random_number():
1881 r = random.randrange(start=1, stop=99999999)
1882 s = str(r)
1883 while len(s) < 10:
1884 s = "0" + s
1885 return s
1886
1887 params2 = dict()
1888 for key in params:
1889 value = params.get(key)
1890 if "!!yaml" in str(value):
1891 value = yaml.safe_load(value[7:])
1892 params2[key] = value
1893
1894 values_file = get_random_number() + ".yaml"
1895 with open(values_file, "w") as stream:
1896 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1897
1898 return "-f {}".format(values_file), values_file
1899
1900 return "", None
1901
1902 # params for use in --set option
1903 @staticmethod
1904 def _params_to_set_option(params: dict) -> str:
1905 params_str = ""
1906 if params and len(params) > 0:
1907 start = True
1908 for key in params:
1909 value = params.get(key, None)
1910 if value is not None:
1911 if start:
1912 params_str += "--set "
1913 start = False
1914 else:
1915 params_str += ","
1916 params_str += "{}={}".format(key, value)
1917 return params_str
1918
1919 @staticmethod
1920 def generate_kdu_instance_name(**kwargs):
1921 chart_name = kwargs["kdu_model"]
1922 # check embeded chart (file or dir)
1923 if chart_name.startswith("/"):
1924 # extract file or directory name
1925 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1926 # check URL
1927 elif "://" in chart_name:
1928 # extract last portion of URL
1929 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1930
1931 name = ""
1932 for c in chart_name:
1933 if c.isalpha() or c.isnumeric():
1934 name += c
1935 else:
1936 name += "-"
1937 if len(name) > 35:
1938 name = name[0:35]
1939
1940 # if does not start with alpha character, prefix 'a'
1941 if not name[0].isalpha():
1942 name = "a" + name
1943
1944 name += "-"
1945
1946 def get_random_number():
1947 r = random.randrange(start=1, stop=99999999)
1948 s = str(r)
1949 s = s.rjust(10, "0")
1950 return s
1951
1952 name = name + get_random_number()
1953 return name.lower()
1954
1955 def _split_version(self, kdu_model: str) -> (str, str):
1956 version = None
1957 if ":" in kdu_model:
1958 parts = kdu_model.split(sep=":")
1959 if len(parts) == 2:
1960 version = str(parts[1])
1961 kdu_model = parts[0]
1962 return kdu_model, version
1963
1964 async def _split_repo(self, kdu_model: str) -> str:
1965 repo_name = None
1966 idx = kdu_model.find("/")
1967 if idx >= 0:
1968 repo_name = kdu_model[:idx]
1969 return repo_name
1970
1971 async def _find_repo(self, kdu_model: str, cluster_uuid: str) -> str:
1972 repo_url = None
1973 idx = kdu_model.find("/")
1974 if idx >= 0:
1975 repo_name = kdu_model[:idx]
1976 # Find repository link
1977 local_repo_list = await self.repo_list(cluster_uuid)
1978 for repo in local_repo_list:
1979 repo_url = repo["url"] if repo["name"] == repo_name else None
1980 return repo_url