Update the repo for a helm KDU before install and upgrade
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
156 ):
157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_uuid, repo_type, name, url
160 )
161 )
162
163 # init_env
164 paths, env = self._init_paths_env(
165 cluster_name=cluster_uuid, create_if_not_exist=True
166 )
167
168 # sync local dir
169 self.fs.sync(from_path=cluster_uuid)
170
171 # helm repo add name url
172 command = "env KUBECONFIG={} {} repo add {} {}".format(
173 paths["kube_config"], self._helm_command, name, url
174 )
175 self.log.debug("adding repo: {}".format(command))
176 await self._local_async_exec(
177 command=command, raise_exception_on_error=True, env=env
178 )
179
180 # helm repo update
181 command = "env KUBECONFIG={} {} repo update {}".format(
182 paths["kube_config"], self._helm_command, name
183 )
184 self.log.debug("updating repo: {}".format(command))
185 await self._local_async_exec(
186 command=command, raise_exception_on_error=False, env=env
187 )
188
189 # sync fs
190 self.fs.reverse_sync(from_path=cluster_uuid)
191
192 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
193 self.log.debug(
194 "Cluster {}, updating {} repository {}".format(
195 cluster_uuid, repo_type, name
196 )
197 )
198
199 # init_env
200 paths, env = self._init_paths_env(
201 cluster_name=cluster_uuid, create_if_not_exist=True
202 )
203
204 # sync local dir
205 self.fs.sync(from_path=cluster_uuid)
206
207 # helm repo update
208 command = "{} repo update {}".format(self._helm_command, name)
209 self.log.debug("updating repo: {}".format(command))
210 await self._local_async_exec(
211 command=command, raise_exception_on_error=False, env=env
212 )
213
214 # sync fs
215 self.fs.reverse_sync(from_path=cluster_uuid)
216
217 async def repo_list(self, cluster_uuid: str) -> list:
218 """
219 Get the list of registered repositories
220
221 :return: list of registered repositories: [ (name, url) .... ]
222 """
223
224 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
225
226 # config filename
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_uuid, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_uuid)
233
234 command = "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths["kube_config"], self._helm_command
236 )
237
238 # Set exception to false because if there are no repos just want an empty list
239 output, _rc = await self._local_async_exec(
240 command=command, raise_exception_on_error=False, env=env
241 )
242
243 # sync fs
244 self.fs.reverse_sync(from_path=cluster_uuid)
245
246 if _rc == 0:
247 if output and len(output) > 0:
248 repos = yaml.load(output, Loader=yaml.SafeLoader)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self._lower_keys_list(repos)
251 else:
252 return []
253 else:
254 return []
255
256 async def repo_remove(self, cluster_uuid: str, name: str):
257 self.log.debug(
258 "remove {} repositories for cluster {}".format(name, cluster_uuid)
259 )
260
261 # init env, paths
262 paths, env = self._init_paths_env(
263 cluster_name=cluster_uuid, create_if_not_exist=True
264 )
265
266 # sync local dir
267 self.fs.sync(from_path=cluster_uuid)
268
269 command = "env KUBECONFIG={} {} repo remove {}".format(
270 paths["kube_config"], self._helm_command, name
271 )
272 await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
274 )
275
276 # sync fs
277 self.fs.reverse_sync(from_path=cluster_uuid)
278
279 async def reset(
280 self,
281 cluster_uuid: str,
282 force: bool = False,
283 uninstall_sw: bool = False,
284 **kwargs,
285 ) -> bool:
286 """Reset a cluster
287
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
289
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
295 """
296 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
297 self.log.debug(
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_uuid, uninstall_sw
300 )
301 )
302
303 # sync local dir
304 self.fs.sync(from_path=cluster_uuid)
305
306 # uninstall releases if needed.
307 if uninstall_sw:
308 releases = await self.instances_list(cluster_uuid=cluster_uuid)
309 if len(releases) > 0:
310 if force:
311 for r in releases:
312 try:
313 kdu_instance = r.get("name")
314 chart = r.get("chart")
315 self.log.debug(
316 "Uninstalling {} -> {}".format(chart, kdu_instance)
317 )
318 await self.uninstall(
319 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
320 )
321 except Exception as e:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
324 # raised an error
325 self.log.warn(
326 "Error uninstalling release {}: {}".format(
327 kdu_instance, e
328 )
329 )
330 else:
331 msg = (
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 ).format(cluster_uuid)
334 self.log.warn(msg)
335 uninstall_sw = (
336 False # Allow to remove k8s cluster without removing Tiller
337 )
338
339 if uninstall_sw:
340 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
341
342 # delete cluster directory
343 self.log.debug("Removing directory {}".format(cluster_uuid))
344 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
345 # Remove also local directorio if still exist
346 direct = self.fs.path + "/" + cluster_uuid
347 shutil.rmtree(direct, ignore_errors=True)
348
349 return True
350
351 async def _install_impl(
352 self,
353 cluster_id: str,
354 kdu_model: str,
355 paths: dict,
356 env: dict,
357 kdu_instance: str,
358 atomic: bool = True,
359 timeout: float = 300,
360 params: dict = None,
361 db_dict: dict = None,
362 kdu_name: str = None,
363 namespace: str = None,
364 ):
365 # init env, paths
366 paths, env = self._init_paths_env(
367 cluster_name=cluster_id, create_if_not_exist=True
368 )
369
370 # params to str
371 params_str, file_to_delete = self._params_to_file_option(
372 cluster_id=cluster_id, params=params
373 )
374
375 # version
376 version = None
377 if ":" in kdu_model:
378 parts = kdu_model.split(sep=":")
379 if len(parts) == 2:
380 version = str(parts[1])
381 kdu_model = parts[0]
382
383 repo = self._split_repo(kdu_model)
384 if repo:
385 self.repo_update(cluster_id, repo)
386
387 command = self._get_install_command(
388 kdu_model,
389 kdu_instance,
390 namespace,
391 params_str,
392 version,
393 atomic,
394 timeout,
395 paths["kube_config"],
396 )
397
398 self.log.debug("installing: {}".format(command))
399
400 if atomic:
401 # exec helm in a task
402 exec_task = asyncio.ensure_future(
403 coro_or_future=self._local_async_exec(
404 command=command, raise_exception_on_error=False, env=env
405 )
406 )
407
408 # write status in another task
409 status_task = asyncio.ensure_future(
410 coro_or_future=self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
416 run_once=False,
417 )
418 )
419
420 # wait for execution task
421 await asyncio.wait([exec_task])
422
423 # cancel status task
424 status_task.cancel()
425
426 output, rc = exec_task.result()
427
428 else:
429
430 output, rc = await self._local_async_exec(
431 command=command, raise_exception_on_error=False, env=env
432 )
433
434 # remove temporal values yaml file
435 if file_to_delete:
436 os.remove(file_to_delete)
437
438 # write final status
439 await self._store_status(
440 cluster_id=cluster_id,
441 kdu_instance=kdu_instance,
442 namespace=namespace,
443 db_dict=db_dict,
444 operation="install",
445 run_once=True,
446 check_every=0,
447 )
448
449 if rc != 0:
450 msg = "Error executing command: {}\nOutput: {}".format(command, output)
451 self.log.error(msg)
452 raise K8sException(msg)
453
454 async def upgrade(
455 self,
456 cluster_uuid: str,
457 kdu_instance: str,
458 kdu_model: str = None,
459 atomic: bool = True,
460 timeout: float = 300,
461 params: dict = None,
462 db_dict: dict = None,
463 ):
464 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
465
466 # sync local dir
467 self.fs.sync(from_path=cluster_uuid)
468
469 # look for instance to obtain namespace
470 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
471 if not instance_info:
472 raise K8sException("kdu_instance {} not found".format(kdu_instance))
473
474 # init env, paths
475 paths, env = self._init_paths_env(
476 cluster_name=cluster_uuid, create_if_not_exist=True
477 )
478
479 # sync local dir
480 self.fs.sync(from_path=cluster_uuid)
481
482 # params to str
483 params_str, file_to_delete = self._params_to_file_option(
484 cluster_id=cluster_uuid, params=params
485 )
486
487 # version
488 version = None
489 if ":" in kdu_model:
490 parts = kdu_model.split(sep=":")
491 if len(parts) == 2:
492 version = str(parts[1])
493 kdu_model = parts[0]
494
495 repo = self._split_repo(kdu_model)
496 if repo:
497 self.repo_update(cluster_uuid, repo)
498
499 command = self._get_upgrade_command(
500 kdu_model,
501 kdu_instance,
502 instance_info["namespace"],
503 params_str,
504 version,
505 atomic,
506 timeout,
507 paths["kube_config"],
508 )
509
510 self.log.debug("upgrading: {}".format(command))
511
512 if atomic:
513
514 # exec helm in a task
515 exec_task = asyncio.ensure_future(
516 coro_or_future=self._local_async_exec(
517 command=command, raise_exception_on_error=False, env=env
518 )
519 )
520 # write status in another task
521 status_task = asyncio.ensure_future(
522 coro_or_future=self._store_status(
523 cluster_id=cluster_uuid,
524 kdu_instance=kdu_instance,
525 namespace=instance_info["namespace"],
526 db_dict=db_dict,
527 operation="upgrade",
528 run_once=False,
529 )
530 )
531
532 # wait for execution task
533 await asyncio.wait([exec_task])
534
535 # cancel status task
536 status_task.cancel()
537 output, rc = exec_task.result()
538
539 else:
540
541 output, rc = await self._local_async_exec(
542 command=command, raise_exception_on_error=False, env=env
543 )
544
545 # remove temporal values yaml file
546 if file_to_delete:
547 os.remove(file_to_delete)
548
549 # write final status
550 await self._store_status(
551 cluster_id=cluster_uuid,
552 kdu_instance=kdu_instance,
553 namespace=instance_info["namespace"],
554 db_dict=db_dict,
555 operation="upgrade",
556 run_once=True,
557 check_every=0,
558 )
559
560 if rc != 0:
561 msg = "Error executing command: {}\nOutput: {}".format(command, output)
562 self.log.error(msg)
563 raise K8sException(msg)
564
565 # sync fs
566 self.fs.reverse_sync(from_path=cluster_uuid)
567
568 # return new revision number
569 instance = await self.get_instance_info(
570 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
571 )
572 if instance:
573 revision = int(instance.get("revision"))
574 self.log.debug("New revision: {}".format(revision))
575 return revision
576 else:
577 return 0
578
579 async def scale(
580 self,
581 kdu_instance: str,
582 scale: int,
583 resource_name: str,
584 total_timeout: float = 1800,
585 **kwargs,
586 ):
587 raise NotImplementedError("Method not implemented")
588
589 async def get_scale_count(
590 self,
591 resource_name: str,
592 kdu_instance: str,
593 **kwargs,
594 ):
595 raise NotImplementedError("Method not implemented")
596
597 async def rollback(
598 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
599 ):
600 self.log.debug(
601 "rollback kdu_instance {} to revision {} from cluster {}".format(
602 kdu_instance, revision, cluster_uuid
603 )
604 )
605
606 # sync local dir
607 self.fs.sync(from_path=cluster_uuid)
608
609 # look for instance to obtain namespace
610 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
611 if not instance_info:
612 raise K8sException("kdu_instance {} not found".format(kdu_instance))
613
614 # init env, paths
615 paths, env = self._init_paths_env(
616 cluster_name=cluster_uuid, create_if_not_exist=True
617 )
618
619 # sync local dir
620 self.fs.sync(from_path=cluster_uuid)
621
622 command = self._get_rollback_command(
623 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
624 )
625
626 self.log.debug("rolling_back: {}".format(command))
627
628 # exec helm in a task
629 exec_task = asyncio.ensure_future(
630 coro_or_future=self._local_async_exec(
631 command=command, raise_exception_on_error=False, env=env
632 )
633 )
634 # write status in another task
635 status_task = asyncio.ensure_future(
636 coro_or_future=self._store_status(
637 cluster_id=cluster_uuid,
638 kdu_instance=kdu_instance,
639 namespace=instance_info["namespace"],
640 db_dict=db_dict,
641 operation="rollback",
642 run_once=False,
643 )
644 )
645
646 # wait for execution task
647 await asyncio.wait([exec_task])
648
649 # cancel status task
650 status_task.cancel()
651
652 output, rc = exec_task.result()
653
654 # write final status
655 await self._store_status(
656 cluster_id=cluster_uuid,
657 kdu_instance=kdu_instance,
658 namespace=instance_info["namespace"],
659 db_dict=db_dict,
660 operation="rollback",
661 run_once=True,
662 check_every=0,
663 )
664
665 if rc != 0:
666 msg = "Error executing command: {}\nOutput: {}".format(command, output)
667 self.log.error(msg)
668 raise K8sException(msg)
669
670 # sync fs
671 self.fs.reverse_sync(from_path=cluster_uuid)
672
673 # return new revision number
674 instance = await self.get_instance_info(
675 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
676 )
677 if instance:
678 revision = int(instance.get("revision"))
679 self.log.debug("New revision: {}".format(revision))
680 return revision
681 else:
682 return 0
683
684 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
685 """
686 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
687 (this call should happen after all _terminate-config-primitive_ of the VNF
688 are invoked).
689
690 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
691 :param kdu_instance: unique name for the KDU instance to be deleted
692 :param kwargs: Additional parameters (None yet)
693 :return: True if successful
694 """
695
696 self.log.debug(
697 "uninstall kdu_instance {} from cluster {}".format(
698 kdu_instance, cluster_uuid
699 )
700 )
701
702 # sync local dir
703 self.fs.sync(from_path=cluster_uuid)
704
705 # look for instance to obtain namespace
706 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
707 if not instance_info:
708 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
709 return True
710 # init env, paths
711 paths, env = self._init_paths_env(
712 cluster_name=cluster_uuid, create_if_not_exist=True
713 )
714
715 # sync local dir
716 self.fs.sync(from_path=cluster_uuid)
717
718 command = self._get_uninstall_command(
719 kdu_instance, instance_info["namespace"], paths["kube_config"]
720 )
721 output, _rc = await self._local_async_exec(
722 command=command, raise_exception_on_error=True, env=env
723 )
724
725 # sync fs
726 self.fs.reverse_sync(from_path=cluster_uuid)
727
728 return self._output_to_table(output)
729
730 async def instances_list(self, cluster_uuid: str) -> list:
731 """
732 returns a list of deployed releases in a cluster
733
734 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
735 :return:
736 """
737
738 self.log.debug("list releases for cluster {}".format(cluster_uuid))
739
740 # sync local dir
741 self.fs.sync(from_path=cluster_uuid)
742
743 # execute internal command
744 result = await self._instances_list(cluster_uuid)
745
746 # sync fs
747 self.fs.reverse_sync(from_path=cluster_uuid)
748
749 return result
750
751 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
752 instances = await self.instances_list(cluster_uuid=cluster_uuid)
753 for instance in instances:
754 if instance.get("name") == kdu_instance:
755 return instance
756 self.log.debug("Instance {} not found".format(kdu_instance))
757 return None
758
759 async def exec_primitive(
760 self,
761 cluster_uuid: str = None,
762 kdu_instance: str = None,
763 primitive_name: str = None,
764 timeout: float = 300,
765 params: dict = None,
766 db_dict: dict = None,
767 **kwargs,
768 ) -> str:
769 """Exec primitive (Juju action)
770
771 :param cluster_uuid: The UUID of the cluster or namespace:cluster
772 :param kdu_instance: The unique name of the KDU instance
773 :param primitive_name: Name of action that will be executed
774 :param timeout: Timeout for action execution
775 :param params: Dictionary of all the parameters needed for the action
776 :db_dict: Dictionary for any additional data
777 :param kwargs: Additional parameters (None yet)
778
779 :return: Returns the output of the action
780 """
781 raise K8sException(
782 "KDUs deployed with Helm don't support actions "
783 "different from rollback, upgrade and status"
784 )
785
786 async def get_services(
787 self, cluster_uuid: str, kdu_instance: str, namespace: str
788 ) -> list:
789 """
790 Returns a list of services defined for the specified kdu instance.
791
792 :param cluster_uuid: UUID of a K8s cluster known by OSM
793 :param kdu_instance: unique name for the KDU instance
794 :param namespace: K8s namespace used by the KDU instance
795 :return: If successful, it will return a list of services, Each service
796 can have the following data:
797 - `name` of the service
798 - `type` type of service in the k8 cluster
799 - `ports` List of ports offered by the service, for each port includes at least
800 name, port, protocol
801 - `cluster_ip` Internal ip to be used inside k8s cluster
802 - `external_ip` List of external ips (in case they are available)
803 """
804
805 self.log.debug(
806 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
807 cluster_uuid, kdu_instance
808 )
809 )
810
811 # init env, paths
812 paths, env = self._init_paths_env(
813 cluster_name=cluster_uuid, create_if_not_exist=True
814 )
815
816 # sync local dir
817 self.fs.sync(from_path=cluster_uuid)
818
819 # get list of services names for kdu
820 service_names = await self._get_services(
821 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
822 )
823
824 service_list = []
825 for service in service_names:
826 service = await self._get_service(cluster_uuid, service, namespace)
827 service_list.append(service)
828
829 # sync fs
830 self.fs.reverse_sync(from_path=cluster_uuid)
831
832 return service_list
833
834 async def get_service(
835 self, cluster_uuid: str, service_name: str, namespace: str
836 ) -> object:
837
838 self.log.debug(
839 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
840 service_name, namespace, cluster_uuid
841 )
842 )
843
844 # sync local dir
845 self.fs.sync(from_path=cluster_uuid)
846
847 service = await self._get_service(cluster_uuid, service_name, namespace)
848
849 # sync fs
850 self.fs.reverse_sync(from_path=cluster_uuid)
851
852 return service
853
854 async def status_kdu(
855 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
856 ) -> Union[str, dict]:
857 """
858 This call would retrieve tha current state of a given KDU instance. It would be
859 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
860 values_ of the configuration parameters applied to a given instance. This call
861 would be based on the `status` call.
862
863 :param cluster_uuid: UUID of a K8s cluster known by OSM
864 :param kdu_instance: unique name for the KDU instance
865 :param kwargs: Additional parameters (None yet)
866 :param yaml_format: if the return shall be returned as an YAML string or as a
867 dictionary
868 :return: If successful, it will return the following vector of arguments:
869 - K8s `namespace` in the cluster where the KDU lives
870 - `state` of the KDU instance. It can be:
871 - UNKNOWN
872 - DEPLOYED
873 - DELETED
874 - SUPERSEDED
875 - FAILED or
876 - DELETING
877 - List of `resources` (objects) that this release consists of, sorted by kind,
878 and the status of those resources
879 - Last `deployment_time`.
880
881 """
882 self.log.debug(
883 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
884 cluster_uuid, kdu_instance
885 )
886 )
887
888 # sync local dir
889 self.fs.sync(from_path=cluster_uuid)
890
891 # get instance: needed to obtain namespace
892 instances = await self._instances_list(cluster_id=cluster_uuid)
893 for instance in instances:
894 if instance.get("name") == kdu_instance:
895 break
896 else:
897 # instance does not exist
898 raise K8sException(
899 "Instance name: {} not found in cluster: {}".format(
900 kdu_instance, cluster_uuid
901 )
902 )
903
904 status = await self._status_kdu(
905 cluster_id=cluster_uuid,
906 kdu_instance=kdu_instance,
907 namespace=instance["namespace"],
908 yaml_format=yaml_format,
909 show_error_log=True,
910 )
911
912 # sync fs
913 self.fs.reverse_sync(from_path=cluster_uuid)
914
915 return status
916
917 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
918
919 self.log.debug(
920 "inspect kdu_model values {} from (optional) repo: {}".format(
921 kdu_model, repo_url
922 )
923 )
924
925 return await self._exec_inspect_comand(
926 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
927 )
928
929 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
930
931 self.log.debug(
932 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
933 )
934
935 return await self._exec_inspect_comand(
936 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
937 )
938
939 async def synchronize_repos(self, cluster_uuid: str):
940
941 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
942 try:
943 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
944 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
945
946 local_repo_list = await self.repo_list(cluster_uuid)
947 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
948
949 deleted_repo_list = []
950 added_repo_dict = {}
951
952 # iterate over the list of repos in the database that should be
953 # added if not present
954 for repo_name, db_repo in db_repo_dict.items():
955 try:
956 # check if it is already present
957 curr_repo_url = local_repo_dict.get(db_repo["name"])
958 repo_id = db_repo.get("_id")
959 if curr_repo_url != db_repo["url"]:
960 if curr_repo_url:
961 self.log.debug(
962 "repo {} url changed, delete and and again".format(
963 db_repo["url"]
964 )
965 )
966 await self.repo_remove(cluster_uuid, db_repo["name"])
967 deleted_repo_list.append(repo_id)
968
969 # add repo
970 self.log.debug("add repo {}".format(db_repo["name"]))
971 await self.repo_add(
972 cluster_uuid, db_repo["name"], db_repo["url"]
973 )
974 added_repo_dict[repo_id] = db_repo["name"]
975 except Exception as e:
976 raise K8sException(
977 "Error adding repo id: {}, err_msg: {} ".format(
978 repo_id, repr(e)
979 )
980 )
981
982 # Delete repos that are present but not in nbi_list
983 for repo_name in local_repo_dict:
984 if not db_repo_dict.get(repo_name) and repo_name != "stable":
985 self.log.debug("delete repo {}".format(repo_name))
986 try:
987 await self.repo_remove(cluster_uuid, repo_name)
988 deleted_repo_list.append(repo_name)
989 except Exception as e:
990 self.warning(
991 "Error deleting repo, name: {}, err_msg: {}".format(
992 repo_name, str(e)
993 )
994 )
995
996 return deleted_repo_list, added_repo_dict
997
998 except K8sException:
999 raise
1000 except Exception as e:
1001 # Do not raise errors synchronizing repos
1002 self.log.error("Error synchronizing repos: {}".format(e))
1003 raise Exception("Error synchronizing repos: {}".format(e))
1004
1005 def _get_db_repos_dict(self, repo_ids: list):
1006 db_repos_dict = {}
1007 for repo_id in repo_ids:
1008 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1009 db_repos_dict[db_repo["name"]] = db_repo
1010 return db_repos_dict
1011
1012 """
1013 ####################################################################################
1014 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1015 ####################################################################################
1016 """
1017
1018 @abc.abstractmethod
1019 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1020 """
1021 Creates and returns base cluster and kube dirs and returns them.
1022 Also created helm3 dirs according to new directory specification, paths are
1023 not returned but assigned to helm environment variables
1024
1025 :param cluster_name: cluster_name
1026 :return: Dictionary with config_paths and dictionary with helm environment variables
1027 """
1028
1029 @abc.abstractmethod
1030 async def _cluster_init(self, cluster_id, namespace, paths, env):
1031 """
1032 Implements the helm version dependent cluster initialization
1033 """
1034
1035 @abc.abstractmethod
1036 async def _instances_list(self, cluster_id):
1037 """
1038 Implements the helm version dependent helm instances list
1039 """
1040
1041 @abc.abstractmethod
1042 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1043 """
1044 Implements the helm version dependent method to obtain services from a helm instance
1045 """
1046
1047 @abc.abstractmethod
1048 async def _status_kdu(
1049 self,
1050 cluster_id: str,
1051 kdu_instance: str,
1052 namespace: str = None,
1053 yaml_format: bool = False,
1054 show_error_log: bool = False,
1055 ) -> Union[str, dict]:
1056 """
1057 Implements the helm version dependent method to obtain status of a helm instance
1058 """
1059
1060 @abc.abstractmethod
1061 def _get_install_command(
1062 self,
1063 kdu_model,
1064 kdu_instance,
1065 namespace,
1066 params_str,
1067 version,
1068 atomic,
1069 timeout,
1070 kubeconfig,
1071 ) -> str:
1072 """
1073 Obtain command to be executed to delete the indicated instance
1074 """
1075
1076 @abc.abstractmethod
1077 def _get_upgrade_command(
1078 self,
1079 kdu_model,
1080 kdu_instance,
1081 namespace,
1082 params_str,
1083 version,
1084 atomic,
1085 timeout,
1086 kubeconfig,
1087 ) -> str:
1088 """
1089 Obtain command to be executed to upgrade the indicated instance
1090 """
1091
1092 @abc.abstractmethod
1093 def _get_rollback_command(
1094 self, kdu_instance, namespace, revision, kubeconfig
1095 ) -> str:
1096 """
1097 Obtain command to be executed to rollback the indicated instance
1098 """
1099
1100 @abc.abstractmethod
1101 def _get_uninstall_command(
1102 self, kdu_instance: str, namespace: str, kubeconfig: str
1103 ) -> str:
1104 """
1105 Obtain command to be executed to delete the indicated instance
1106 """
1107
1108 @abc.abstractmethod
1109 def _get_inspect_command(
1110 self, show_command: str, kdu_model: str, repo_str: str, version: str
1111 ):
1112 """
1113 Obtain command to be executed to obtain information about the kdu
1114 """
1115
1116 @abc.abstractmethod
1117 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1118 """
1119 Method call to uninstall cluster software for helm. This method is dependent
1120 of helm version
1121 For Helm v2 it will be called when Tiller must be uninstalled
1122 For Helm v3 it does nothing and does not need to be callled
1123 """
1124
1125 @abc.abstractmethod
1126 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1127 """
1128 Obtains the cluster repos identifiers
1129 """
1130
1131 """
1132 ####################################################################################
1133 ################################### P R I V A T E ##################################
1134 ####################################################################################
1135 """
1136
1137 @staticmethod
1138 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1139 if os.path.exists(filename):
1140 return True
1141 else:
1142 msg = "File {} does not exist".format(filename)
1143 if exception_if_not_exists:
1144 raise K8sException(msg)
1145
1146 @staticmethod
1147 def _remove_multiple_spaces(strobj):
1148 strobj = strobj.strip()
1149 while " " in strobj:
1150 strobj = strobj.replace(" ", " ")
1151 return strobj
1152
1153 @staticmethod
1154 def _output_to_lines(output: str) -> list:
1155 output_lines = list()
1156 lines = output.splitlines(keepends=False)
1157 for line in lines:
1158 line = line.strip()
1159 if len(line) > 0:
1160 output_lines.append(line)
1161 return output_lines
1162
1163 @staticmethod
1164 def _output_to_table(output: str) -> list:
1165 output_table = list()
1166 lines = output.splitlines(keepends=False)
1167 for line in lines:
1168 line = line.replace("\t", " ")
1169 line_list = list()
1170 output_table.append(line_list)
1171 cells = line.split(sep=" ")
1172 for cell in cells:
1173 cell = cell.strip()
1174 if len(cell) > 0:
1175 line_list.append(cell)
1176 return output_table
1177
1178 @staticmethod
1179 def _parse_services(output: str) -> list:
1180 lines = output.splitlines(keepends=False)
1181 services = []
1182 for line in lines:
1183 line = line.replace("\t", " ")
1184 cells = line.split(sep=" ")
1185 if len(cells) > 0 and cells[0].startswith("service/"):
1186 elems = cells[0].split(sep="/")
1187 if len(elems) > 1:
1188 services.append(elems[1])
1189 return services
1190
1191 @staticmethod
1192 def _get_deep(dictionary: dict, members: tuple):
1193 target = dictionary
1194 value = None
1195 try:
1196 for m in members:
1197 value = target.get(m)
1198 if not value:
1199 return None
1200 else:
1201 target = value
1202 except Exception:
1203 pass
1204 return value
1205
1206 # find key:value in several lines
1207 @staticmethod
1208 def _find_in_lines(p_lines: list, p_key: str) -> str:
1209 for line in p_lines:
1210 try:
1211 if line.startswith(p_key + ":"):
1212 parts = line.split(":")
1213 the_value = parts[1].strip()
1214 return the_value
1215 except Exception:
1216 # ignore it
1217 pass
1218 return None
1219
1220 @staticmethod
1221 def _lower_keys_list(input_list: list):
1222 """
1223 Transform the keys in a list of dictionaries to lower case and returns a new list
1224 of dictionaries
1225 """
1226 new_list = []
1227 if input_list:
1228 for dictionary in input_list:
1229 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1230 new_list.append(new_dict)
1231 return new_list
1232
1233 async def _local_async_exec(
1234 self,
1235 command: str,
1236 raise_exception_on_error: bool = False,
1237 show_error_log: bool = True,
1238 encode_utf8: bool = False,
1239 env: dict = None,
1240 ) -> (str, int):
1241
1242 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1243 self.log.debug(
1244 "Executing async local command: {}, env: {}".format(command, env)
1245 )
1246
1247 # split command
1248 command = shlex.split(command)
1249
1250 environ = os.environ.copy()
1251 if env:
1252 environ.update(env)
1253
1254 try:
1255 process = await asyncio.create_subprocess_exec(
1256 *command,
1257 stdout=asyncio.subprocess.PIPE,
1258 stderr=asyncio.subprocess.PIPE,
1259 env=environ,
1260 )
1261
1262 # wait for command terminate
1263 stdout, stderr = await process.communicate()
1264
1265 return_code = process.returncode
1266
1267 output = ""
1268 if stdout:
1269 output = stdout.decode("utf-8").strip()
1270 # output = stdout.decode()
1271 if stderr:
1272 output = stderr.decode("utf-8").strip()
1273 # output = stderr.decode()
1274
1275 if return_code != 0 and show_error_log:
1276 self.log.debug(
1277 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1278 )
1279 else:
1280 self.log.debug("Return code: {}".format(return_code))
1281
1282 if raise_exception_on_error and return_code != 0:
1283 raise K8sException(output)
1284
1285 if encode_utf8:
1286 output = output.encode("utf-8").strip()
1287 output = str(output).replace("\\n", "\n")
1288
1289 return output, return_code
1290
1291 except asyncio.CancelledError:
1292 raise
1293 except K8sException:
1294 raise
1295 except Exception as e:
1296 msg = "Exception executing command: {} -> {}".format(command, e)
1297 self.log.error(msg)
1298 if raise_exception_on_error:
1299 raise K8sException(e) from e
1300 else:
1301 return "", -1
1302
1303 async def _local_async_exec_pipe(
1304 self,
1305 command1: str,
1306 command2: str,
1307 raise_exception_on_error: bool = True,
1308 show_error_log: bool = True,
1309 encode_utf8: bool = False,
1310 env: dict = None,
1311 ):
1312
1313 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1314 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1315 command = "{} | {}".format(command1, command2)
1316 self.log.debug(
1317 "Executing async local command: {}, env: {}".format(command, env)
1318 )
1319
1320 # split command
1321 command1 = shlex.split(command1)
1322 command2 = shlex.split(command2)
1323
1324 environ = os.environ.copy()
1325 if env:
1326 environ.update(env)
1327
1328 try:
1329 read, write = os.pipe()
1330 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1331 os.close(write)
1332 process_2 = await asyncio.create_subprocess_exec(
1333 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1334 )
1335 os.close(read)
1336 stdout, stderr = await process_2.communicate()
1337
1338 return_code = process_2.returncode
1339
1340 output = ""
1341 if stdout:
1342 output = stdout.decode("utf-8").strip()
1343 # output = stdout.decode()
1344 if stderr:
1345 output = stderr.decode("utf-8").strip()
1346 # output = stderr.decode()
1347
1348 if return_code != 0 and show_error_log:
1349 self.log.debug(
1350 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1351 )
1352 else:
1353 self.log.debug("Return code: {}".format(return_code))
1354
1355 if raise_exception_on_error and return_code != 0:
1356 raise K8sException(output)
1357
1358 if encode_utf8:
1359 output = output.encode("utf-8").strip()
1360 output = str(output).replace("\\n", "\n")
1361
1362 return output, return_code
1363 except asyncio.CancelledError:
1364 raise
1365 except K8sException:
1366 raise
1367 except Exception as e:
1368 msg = "Exception executing command: {} -> {}".format(command, e)
1369 self.log.error(msg)
1370 if raise_exception_on_error:
1371 raise K8sException(e) from e
1372 else:
1373 return "", -1
1374
1375 async def _get_service(self, cluster_id, service_name, namespace):
1376 """
1377 Obtains the data of the specified service in the k8cluster.
1378
1379 :param cluster_id: id of a K8s cluster known by OSM
1380 :param service_name: name of the K8s service in the specified namespace
1381 :param namespace: K8s namespace used by the KDU instance
1382 :return: If successful, it will return a service with the following data:
1383 - `name` of the service
1384 - `type` type of service in the k8 cluster
1385 - `ports` List of ports offered by the service, for each port includes at least
1386 name, port, protocol
1387 - `cluster_ip` Internal ip to be used inside k8s cluster
1388 - `external_ip` List of external ips (in case they are available)
1389 """
1390
1391 # init config, env
1392 paths, env = self._init_paths_env(
1393 cluster_name=cluster_id, create_if_not_exist=True
1394 )
1395
1396 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1397 self.kubectl_command, paths["kube_config"], namespace, service_name
1398 )
1399
1400 output, _rc = await self._local_async_exec(
1401 command=command, raise_exception_on_error=True, env=env
1402 )
1403
1404 data = yaml.load(output, Loader=yaml.SafeLoader)
1405
1406 service = {
1407 "name": service_name,
1408 "type": self._get_deep(data, ("spec", "type")),
1409 "ports": self._get_deep(data, ("spec", "ports")),
1410 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1411 }
1412 if service["type"] == "LoadBalancer":
1413 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1414 ip_list = [elem["ip"] for elem in ip_map_list]
1415 service["external_ip"] = ip_list
1416
1417 return service
1418
1419 async def _exec_inspect_comand(
1420 self, inspect_command: str, kdu_model: str, repo_url: str = None
1421 ):
1422 """
1423 Obtains information about a kdu, no cluster (no env)
1424 """
1425
1426 repo_str = ""
1427 if repo_url:
1428 repo_str = " --repo {}".format(repo_url)
1429
1430 idx = kdu_model.find("/")
1431 if idx >= 0:
1432 idx += 1
1433 kdu_model = kdu_model[idx:]
1434
1435 version = ""
1436 if ":" in kdu_model:
1437 parts = kdu_model.split(sep=":")
1438 if len(parts) == 2:
1439 version = "--version {}".format(str(parts[1]))
1440 kdu_model = parts[0]
1441
1442 full_command = self._get_inspect_command(
1443 inspect_command, kdu_model, repo_str, version
1444 )
1445 output, _rc = await self._local_async_exec(
1446 command=full_command, encode_utf8=True
1447 )
1448
1449 return output
1450
1451 async def _store_status(
1452 self,
1453 cluster_id: str,
1454 operation: str,
1455 kdu_instance: str,
1456 namespace: str = None,
1457 check_every: float = 10,
1458 db_dict: dict = None,
1459 run_once: bool = False,
1460 ):
1461 while True:
1462 try:
1463 await asyncio.sleep(check_every)
1464 detailed_status = await self._status_kdu(
1465 cluster_id=cluster_id,
1466 kdu_instance=kdu_instance,
1467 yaml_format=False,
1468 namespace=namespace,
1469 )
1470 status = detailed_status.get("info").get("description")
1471 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1472 # write status to db
1473 result = await self.write_app_status_to_db(
1474 db_dict=db_dict,
1475 status=str(status),
1476 detailed_status=str(detailed_status),
1477 operation=operation,
1478 )
1479 if not result:
1480 self.log.info("Error writing in database. Task exiting...")
1481 return
1482 except asyncio.CancelledError:
1483 self.log.debug("Task cancelled")
1484 return
1485 except Exception as e:
1486 self.log.debug(
1487 "_store_status exception: {}".format(str(e)), exc_info=True
1488 )
1489 pass
1490 finally:
1491 if run_once:
1492 return
1493
1494 # params for use in -f file
1495 # returns values file option and filename (in order to delete it at the end)
1496 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1497
1498 if params and len(params) > 0:
1499 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1500
1501 def get_random_number():
1502 r = random.randrange(start=1, stop=99999999)
1503 s = str(r)
1504 while len(s) < 10:
1505 s = "0" + s
1506 return s
1507
1508 params2 = dict()
1509 for key in params:
1510 value = params.get(key)
1511 if "!!yaml" in str(value):
1512 value = yaml.load(value[7:])
1513 params2[key] = value
1514
1515 values_file = get_random_number() + ".yaml"
1516 with open(values_file, "w") as stream:
1517 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1518
1519 return "-f {}".format(values_file), values_file
1520
1521 return "", None
1522
1523 # params for use in --set option
1524 @staticmethod
1525 def _params_to_set_option(params: dict) -> str:
1526 params_str = ""
1527 if params and len(params) > 0:
1528 start = True
1529 for key in params:
1530 value = params.get(key, None)
1531 if value is not None:
1532 if start:
1533 params_str += "--set "
1534 start = False
1535 else:
1536 params_str += ","
1537 params_str += "{}={}".format(key, value)
1538 return params_str
1539
1540 @staticmethod
1541 def generate_kdu_instance_name(**kwargs):
1542 chart_name = kwargs["kdu_model"]
1543 # check embeded chart (file or dir)
1544 if chart_name.startswith("/"):
1545 # extract file or directory name
1546 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1547 # check URL
1548 elif "://" in chart_name:
1549 # extract last portion of URL
1550 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1551
1552 name = ""
1553 for c in chart_name:
1554 if c.isalpha() or c.isnumeric():
1555 name += c
1556 else:
1557 name += "-"
1558 if len(name) > 35:
1559 name = name[0:35]
1560
1561 # if does not start with alpha character, prefix 'a'
1562 if not name[0].isalpha():
1563 name = "a" + name
1564
1565 name += "-"
1566
1567 def get_random_number():
1568 r = random.randrange(start=1, stop=99999999)
1569 s = str(r)
1570 s = s.rjust(10, "0")
1571 return s
1572
1573 name = name + get_random_number()
1574 return name.lower()
1575
1576 async def _split_repo(self, kdu_model: str) -> str:
1577 repo_name = None
1578 idx = kdu_model.find("/")
1579 if idx >= 0:
1580 repo_name = kdu_model[:idx]
1581 return repo_name