Bug 1982 fixed
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 def _get_namespace(self, cluster_uuid: str) -> str:
94 """
95 Obtains the namespace used by the cluster with the uuid passed by argument
96
97 param: cluster_uuid: cluster's uuid
98 """
99
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster = self.db.get_one(
102 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
103 )
104 return k8scluster.get("namespace")
105
106 async def init_env(
107 self,
108 k8s_creds: str,
109 namespace: str = "kube-system",
110 reuse_cluster_uuid=None,
111 **kwargs,
112 ) -> (str, bool):
113 """
114 It prepares a given K8s cluster environment to run Charts
115
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 '.kube/config'
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
125 """
126
127 if reuse_cluster_uuid:
128 cluster_id = reuse_cluster_uuid
129 else:
130 cluster_id = str(uuid4())
131
132 self.log.debug(
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
134 )
135
136 paths, env = self._init_paths_env(
137 cluster_name=cluster_id, create_if_not_exist=True
138 )
139 mode = stat.S_IRUSR | stat.S_IWUSR
140 with open(paths["kube_config"], "w", mode) as f:
141 f.write(k8s_creds)
142 os.chmod(paths["kube_config"], 0o600)
143
144 # Code with initialization specific of helm version
145 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
146
147 # sync fs with local data
148 self.fs.reverse_sync(from_path=cluster_id)
149
150 self.log.info("Cluster {} initialized".format(cluster_id))
151
152 return cluster_id, n2vc_installed_sw
153
154 async def repo_add(
155 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
156 ):
157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_uuid, repo_type, name, url
160 )
161 )
162
163 # init_env
164 paths, env = self._init_paths_env(
165 cluster_name=cluster_uuid, create_if_not_exist=True
166 )
167
168 # sync local dir
169 self.fs.sync(from_path=cluster_uuid)
170
171 # helm repo update
172 command = "env KUBECONFIG={} {} repo update".format(
173 paths["kube_config"], self._helm_command
174 )
175 self.log.debug("updating repo: {}".format(command))
176 await self._local_async_exec(
177 command=command, raise_exception_on_error=False, env=env
178 )
179
180 # helm repo add name url
181 command = "env KUBECONFIG={} {} repo add {} {}".format(
182 paths["kube_config"], self._helm_command, name, url
183 )
184 self.log.debug("adding repo: {}".format(command))
185 await self._local_async_exec(
186 command=command, raise_exception_on_error=True, env=env
187 )
188
189 # sync fs
190 self.fs.reverse_sync(from_path=cluster_uuid)
191
192 async def repo_list(self, cluster_uuid: str) -> list:
193 """
194 Get the list of registered repositories
195
196 :return: list of registered repositories: [ (name, url) .... ]
197 """
198
199 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
200
201 # config filename
202 paths, env = self._init_paths_env(
203 cluster_name=cluster_uuid, create_if_not_exist=True
204 )
205
206 # sync local dir
207 self.fs.sync(from_path=cluster_uuid)
208
209 command = "env KUBECONFIG={} {} repo list --output yaml".format(
210 paths["kube_config"], self._helm_command
211 )
212
213 # Set exception to false because if there are no repos just want an empty list
214 output, _rc = await self._local_async_exec(
215 command=command, raise_exception_on_error=False, env=env
216 )
217
218 # sync fs
219 self.fs.reverse_sync(from_path=cluster_uuid)
220
221 if _rc == 0:
222 if output and len(output) > 0:
223 repos = yaml.load(output, Loader=yaml.SafeLoader)
224 # unify format between helm2 and helm3 setting all keys lowercase
225 return self._lower_keys_list(repos)
226 else:
227 return []
228 else:
229 return []
230
231 async def repo_remove(self, cluster_uuid: str, name: str):
232 self.log.debug(
233 "remove {} repositories for cluster {}".format(name, cluster_uuid)
234 )
235
236 # init env, paths
237 paths, env = self._init_paths_env(
238 cluster_name=cluster_uuid, create_if_not_exist=True
239 )
240
241 # sync local dir
242 self.fs.sync(from_path=cluster_uuid)
243
244 command = "env KUBECONFIG={} {} repo remove {}".format(
245 paths["kube_config"], self._helm_command, name
246 )
247 await self._local_async_exec(
248 command=command, raise_exception_on_error=True, env=env
249 )
250
251 # sync fs
252 self.fs.reverse_sync(from_path=cluster_uuid)
253
254 async def reset(
255 self,
256 cluster_uuid: str,
257 force: bool = False,
258 uninstall_sw: bool = False,
259 **kwargs,
260 ) -> bool:
261 """Reset a cluster
262
263 Resets the Kubernetes cluster by removing the helm deployment that represents it.
264
265 :param cluster_uuid: The UUID of the cluster to reset
266 :param force: Boolean to force the reset
267 :param uninstall_sw: Boolean to force the reset
268 :param kwargs: Additional parameters (None yet)
269 :return: Returns True if successful or raises an exception.
270 """
271 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
272 self.log.debug(
273 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
274 cluster_uuid, uninstall_sw
275 )
276 )
277
278 # sync local dir
279 self.fs.sync(from_path=cluster_uuid)
280
281 # uninstall releases if needed.
282 if uninstall_sw:
283 releases = await self.instances_list(cluster_uuid=cluster_uuid)
284 if len(releases) > 0:
285 if force:
286 for r in releases:
287 try:
288 kdu_instance = r.get("name")
289 chart = r.get("chart")
290 self.log.debug(
291 "Uninstalling {} -> {}".format(chart, kdu_instance)
292 )
293 await self.uninstall(
294 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
295 )
296 except Exception as e:
297 # will not raise exception as it was found
298 # that in some cases of previously installed helm releases it
299 # raised an error
300 self.log.warn(
301 "Error uninstalling release {}: {}".format(
302 kdu_instance, e
303 )
304 )
305 else:
306 msg = (
307 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
308 ).format(cluster_uuid)
309 self.log.warn(msg)
310 uninstall_sw = (
311 False # Allow to remove k8s cluster without removing Tiller
312 )
313
314 if uninstall_sw:
315 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
316
317 # delete cluster directory
318 self.log.debug("Removing directory {}".format(cluster_uuid))
319 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
320 # Remove also local directorio if still exist
321 direct = self.fs.path + "/" + cluster_uuid
322 shutil.rmtree(direct, ignore_errors=True)
323
324 return True
325
326 async def _install_impl(
327 self,
328 cluster_id: str,
329 kdu_model: str,
330 paths: dict,
331 env: dict,
332 kdu_instance: str,
333 atomic: bool = True,
334 timeout: float = 300,
335 params: dict = None,
336 db_dict: dict = None,
337 kdu_name: str = None,
338 namespace: str = None,
339 ):
340 # init env, paths
341 paths, env = self._init_paths_env(
342 cluster_name=cluster_id, create_if_not_exist=True
343 )
344
345 # params to str
346 params_str, file_to_delete = self._params_to_file_option(
347 cluster_id=cluster_id, params=params
348 )
349
350 # version
351 version = None
352 if ":" in kdu_model:
353 parts = kdu_model.split(sep=":")
354 if len(parts) == 2:
355 version = str(parts[1])
356 kdu_model = parts[0]
357
358 command = self._get_install_command(
359 kdu_model,
360 kdu_instance,
361 namespace,
362 params_str,
363 version,
364 atomic,
365 timeout,
366 paths["kube_config"],
367 )
368
369 self.log.debug("installing: {}".format(command))
370
371 if atomic:
372 # exec helm in a task
373 exec_task = asyncio.ensure_future(
374 coro_or_future=self._local_async_exec(
375 command=command, raise_exception_on_error=False, env=env
376 )
377 )
378
379 # write status in another task
380 status_task = asyncio.ensure_future(
381 coro_or_future=self._store_status(
382 cluster_id=cluster_id,
383 kdu_instance=kdu_instance,
384 namespace=namespace,
385 db_dict=db_dict,
386 operation="install",
387 run_once=False,
388 )
389 )
390
391 # wait for execution task
392 await asyncio.wait([exec_task])
393
394 # cancel status task
395 status_task.cancel()
396
397 output, rc = exec_task.result()
398
399 else:
400
401 output, rc = await self._local_async_exec(
402 command=command, raise_exception_on_error=False, env=env
403 )
404
405 # remove temporal values yaml file
406 if file_to_delete:
407 os.remove(file_to_delete)
408
409 # write final status
410 await self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
416 run_once=True,
417 check_every=0,
418 )
419
420 if rc != 0:
421 msg = "Error executing command: {}\nOutput: {}".format(command, output)
422 self.log.error(msg)
423 raise K8sException(msg)
424
425 async def upgrade(
426 self,
427 cluster_uuid: str,
428 kdu_instance: str,
429 kdu_model: str = None,
430 atomic: bool = True,
431 timeout: float = 300,
432 params: dict = None,
433 db_dict: dict = None,
434 ):
435 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
436
437 # sync local dir
438 self.fs.sync(from_path=cluster_uuid)
439
440 # look for instance to obtain namespace
441 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
442 if not instance_info:
443 raise K8sException("kdu_instance {} not found".format(kdu_instance))
444
445 # init env, paths
446 paths, env = self._init_paths_env(
447 cluster_name=cluster_uuid, create_if_not_exist=True
448 )
449
450 # sync local dir
451 self.fs.sync(from_path=cluster_uuid)
452
453 # params to str
454 params_str, file_to_delete = self._params_to_file_option(
455 cluster_id=cluster_uuid, params=params
456 )
457
458 # version
459 version = None
460 if ":" in kdu_model:
461 parts = kdu_model.split(sep=":")
462 if len(parts) == 2:
463 version = str(parts[1])
464 kdu_model = parts[0]
465
466 command = self._get_upgrade_command(
467 kdu_model,
468 kdu_instance,
469 instance_info["namespace"],
470 params_str,
471 version,
472 atomic,
473 timeout,
474 paths["kube_config"],
475 )
476
477 self.log.debug("upgrading: {}".format(command))
478
479 if atomic:
480
481 # exec helm in a task
482 exec_task = asyncio.ensure_future(
483 coro_or_future=self._local_async_exec(
484 command=command, raise_exception_on_error=False, env=env
485 )
486 )
487 # write status in another task
488 status_task = asyncio.ensure_future(
489 coro_or_future=self._store_status(
490 cluster_id=cluster_uuid,
491 kdu_instance=kdu_instance,
492 namespace=instance_info["namespace"],
493 db_dict=db_dict,
494 operation="upgrade",
495 run_once=False,
496 )
497 )
498
499 # wait for execution task
500 await asyncio.wait([exec_task])
501
502 # cancel status task
503 status_task.cancel()
504 output, rc = exec_task.result()
505
506 else:
507
508 output, rc = await self._local_async_exec(
509 command=command, raise_exception_on_error=False, env=env
510 )
511
512 # remove temporal values yaml file
513 if file_to_delete:
514 os.remove(file_to_delete)
515
516 # write final status
517 await self._store_status(
518 cluster_id=cluster_uuid,
519 kdu_instance=kdu_instance,
520 namespace=instance_info["namespace"],
521 db_dict=db_dict,
522 operation="upgrade",
523 run_once=True,
524 check_every=0,
525 )
526
527 if rc != 0:
528 msg = "Error executing command: {}\nOutput: {}".format(command, output)
529 self.log.error(msg)
530 raise K8sException(msg)
531
532 # sync fs
533 self.fs.reverse_sync(from_path=cluster_uuid)
534
535 # return new revision number
536 instance = await self.get_instance_info(
537 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
538 )
539 if instance:
540 revision = int(instance.get("revision"))
541 self.log.debug("New revision: {}".format(revision))
542 return revision
543 else:
544 return 0
545
546 async def scale(
547 self,
548 kdu_instance: str,
549 scale: int,
550 resource_name: str,
551 total_timeout: float = 1800,
552 **kwargs,
553 ):
554 raise NotImplementedError("Method not implemented")
555
556 async def get_scale_count(
557 self,
558 resource_name: str,
559 kdu_instance: str,
560 **kwargs,
561 ):
562 raise NotImplementedError("Method not implemented")
563
564 async def rollback(
565 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
566 ):
567 self.log.debug(
568 "rollback kdu_instance {} to revision {} from cluster {}".format(
569 kdu_instance, revision, cluster_uuid
570 )
571 )
572
573 # sync local dir
574 self.fs.sync(from_path=cluster_uuid)
575
576 # look for instance to obtain namespace
577 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
578 if not instance_info:
579 raise K8sException("kdu_instance {} not found".format(kdu_instance))
580
581 # init env, paths
582 paths, env = self._init_paths_env(
583 cluster_name=cluster_uuid, create_if_not_exist=True
584 )
585
586 # sync local dir
587 self.fs.sync(from_path=cluster_uuid)
588
589 command = self._get_rollback_command(
590 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
591 )
592
593 self.log.debug("rolling_back: {}".format(command))
594
595 # exec helm in a task
596 exec_task = asyncio.ensure_future(
597 coro_or_future=self._local_async_exec(
598 command=command, raise_exception_on_error=False, env=env
599 )
600 )
601 # write status in another task
602 status_task = asyncio.ensure_future(
603 coro_or_future=self._store_status(
604 cluster_id=cluster_uuid,
605 kdu_instance=kdu_instance,
606 namespace=instance_info["namespace"],
607 db_dict=db_dict,
608 operation="rollback",
609 run_once=False,
610 )
611 )
612
613 # wait for execution task
614 await asyncio.wait([exec_task])
615
616 # cancel status task
617 status_task.cancel()
618
619 output, rc = exec_task.result()
620
621 # write final status
622 await self._store_status(
623 cluster_id=cluster_uuid,
624 kdu_instance=kdu_instance,
625 namespace=instance_info["namespace"],
626 db_dict=db_dict,
627 operation="rollback",
628 run_once=True,
629 check_every=0,
630 )
631
632 if rc != 0:
633 msg = "Error executing command: {}\nOutput: {}".format(command, output)
634 self.log.error(msg)
635 raise K8sException(msg)
636
637 # sync fs
638 self.fs.reverse_sync(from_path=cluster_uuid)
639
640 # return new revision number
641 instance = await self.get_instance_info(
642 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
643 )
644 if instance:
645 revision = int(instance.get("revision"))
646 self.log.debug("New revision: {}".format(revision))
647 return revision
648 else:
649 return 0
650
651 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
652 """
653 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
654 (this call should happen after all _terminate-config-primitive_ of the VNF
655 are invoked).
656
657 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
658 :param kdu_instance: unique name for the KDU instance to be deleted
659 :param kwargs: Additional parameters (None yet)
660 :return: True if successful
661 """
662
663 self.log.debug(
664 "uninstall kdu_instance {} from cluster {}".format(
665 kdu_instance, cluster_uuid
666 )
667 )
668
669 # sync local dir
670 self.fs.sync(from_path=cluster_uuid)
671
672 # look for instance to obtain namespace
673 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
674 if not instance_info:
675 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
676 return True
677 # init env, paths
678 paths, env = self._init_paths_env(
679 cluster_name=cluster_uuid, create_if_not_exist=True
680 )
681
682 # sync local dir
683 self.fs.sync(from_path=cluster_uuid)
684
685 command = self._get_uninstall_command(
686 kdu_instance, instance_info["namespace"], paths["kube_config"]
687 )
688 output, _rc = await self._local_async_exec(
689 command=command, raise_exception_on_error=True, env=env
690 )
691
692 # sync fs
693 self.fs.reverse_sync(from_path=cluster_uuid)
694
695 return self._output_to_table(output)
696
697 async def instances_list(self, cluster_uuid: str) -> list:
698 """
699 returns a list of deployed releases in a cluster
700
701 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
702 :return:
703 """
704
705 self.log.debug("list releases for cluster {}".format(cluster_uuid))
706
707 # sync local dir
708 self.fs.sync(from_path=cluster_uuid)
709
710 # execute internal command
711 result = await self._instances_list(cluster_uuid)
712
713 # sync fs
714 self.fs.reverse_sync(from_path=cluster_uuid)
715
716 return result
717
718 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
719 instances = await self.instances_list(cluster_uuid=cluster_uuid)
720 for instance in instances:
721 if instance.get("name") == kdu_instance:
722 return instance
723 self.log.debug("Instance {} not found".format(kdu_instance))
724 return None
725
726 async def exec_primitive(
727 self,
728 cluster_uuid: str = None,
729 kdu_instance: str = None,
730 primitive_name: str = None,
731 timeout: float = 300,
732 params: dict = None,
733 db_dict: dict = None,
734 **kwargs,
735 ) -> str:
736 """Exec primitive (Juju action)
737
738 :param cluster_uuid: The UUID of the cluster or namespace:cluster
739 :param kdu_instance: The unique name of the KDU instance
740 :param primitive_name: Name of action that will be executed
741 :param timeout: Timeout for action execution
742 :param params: Dictionary of all the parameters needed for the action
743 :db_dict: Dictionary for any additional data
744 :param kwargs: Additional parameters (None yet)
745
746 :return: Returns the output of the action
747 """
748 raise K8sException(
749 "KDUs deployed with Helm don't support actions "
750 "different from rollback, upgrade and status"
751 )
752
753 async def get_services(
754 self, cluster_uuid: str, kdu_instance: str, namespace: str
755 ) -> list:
756 """
757 Returns a list of services defined for the specified kdu instance.
758
759 :param cluster_uuid: UUID of a K8s cluster known by OSM
760 :param kdu_instance: unique name for the KDU instance
761 :param namespace: K8s namespace used by the KDU instance
762 :return: If successful, it will return a list of services, Each service
763 can have the following data:
764 - `name` of the service
765 - `type` type of service in the k8 cluster
766 - `ports` List of ports offered by the service, for each port includes at least
767 name, port, protocol
768 - `cluster_ip` Internal ip to be used inside k8s cluster
769 - `external_ip` List of external ips (in case they are available)
770 """
771
772 self.log.debug(
773 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
774 cluster_uuid, kdu_instance
775 )
776 )
777
778 # init env, paths
779 paths, env = self._init_paths_env(
780 cluster_name=cluster_uuid, create_if_not_exist=True
781 )
782
783 # sync local dir
784 self.fs.sync(from_path=cluster_uuid)
785
786 # get list of services names for kdu
787 service_names = await self._get_services(
788 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
789 )
790
791 service_list = []
792 for service in service_names:
793 service = await self._get_service(cluster_uuid, service, namespace)
794 service_list.append(service)
795
796 # sync fs
797 self.fs.reverse_sync(from_path=cluster_uuid)
798
799 return service_list
800
801 async def get_service(
802 self, cluster_uuid: str, service_name: str, namespace: str
803 ) -> object:
804
805 self.log.debug(
806 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
807 service_name, namespace, cluster_uuid
808 )
809 )
810
811 # sync local dir
812 self.fs.sync(from_path=cluster_uuid)
813
814 service = await self._get_service(cluster_uuid, service_name, namespace)
815
816 # sync fs
817 self.fs.reverse_sync(from_path=cluster_uuid)
818
819 return service
820
821 async def status_kdu(
822 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
823 ) -> Union[str, dict]:
824 """
825 This call would retrieve tha current state of a given KDU instance. It would be
826 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
827 values_ of the configuration parameters applied to a given instance. This call
828 would be based on the `status` call.
829
830 :param cluster_uuid: UUID of a K8s cluster known by OSM
831 :param kdu_instance: unique name for the KDU instance
832 :param kwargs: Additional parameters (None yet)
833 :param yaml_format: if the return shall be returned as an YAML string or as a
834 dictionary
835 :return: If successful, it will return the following vector of arguments:
836 - K8s `namespace` in the cluster where the KDU lives
837 - `state` of the KDU instance. It can be:
838 - UNKNOWN
839 - DEPLOYED
840 - DELETED
841 - SUPERSEDED
842 - FAILED or
843 - DELETING
844 - List of `resources` (objects) that this release consists of, sorted by kind,
845 and the status of those resources
846 - Last `deployment_time`.
847
848 """
849 self.log.debug(
850 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
851 cluster_uuid, kdu_instance
852 )
853 )
854
855 # sync local dir
856 self.fs.sync(from_path=cluster_uuid)
857
858 # get instance: needed to obtain namespace
859 instances = await self._instances_list(cluster_id=cluster_uuid)
860 for instance in instances:
861 if instance.get("name") == kdu_instance:
862 break
863 else:
864 # instance does not exist
865 raise K8sException(
866 "Instance name: {} not found in cluster: {}".format(
867 kdu_instance, cluster_uuid
868 )
869 )
870
871 status = await self._status_kdu(
872 cluster_id=cluster_uuid,
873 kdu_instance=kdu_instance,
874 namespace=instance["namespace"],
875 yaml_format=yaml_format,
876 show_error_log=True,
877 )
878
879 # sync fs
880 self.fs.reverse_sync(from_path=cluster_uuid)
881
882 return status
883
884 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
885
886 self.log.debug(
887 "inspect kdu_model values {} from (optional) repo: {}".format(
888 kdu_model, repo_url
889 )
890 )
891
892 return await self._exec_inspect_comand(
893 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
894 )
895
896 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
897
898 self.log.debug(
899 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
900 )
901
902 return await self._exec_inspect_comand(
903 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
904 )
905
906 async def synchronize_repos(self, cluster_uuid: str):
907
908 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
909 try:
910 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
911 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
912
913 local_repo_list = await self.repo_list(cluster_uuid)
914 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
915
916 deleted_repo_list = []
917 added_repo_dict = {}
918
919 # iterate over the list of repos in the database that should be
920 # added if not present
921 for repo_name, db_repo in db_repo_dict.items():
922 try:
923 # check if it is already present
924 curr_repo_url = local_repo_dict.get(db_repo["name"])
925 repo_id = db_repo.get("_id")
926 if curr_repo_url != db_repo["url"]:
927 if curr_repo_url:
928 self.log.debug(
929 "repo {} url changed, delete and and again".format(
930 db_repo["url"]
931 )
932 )
933 await self.repo_remove(cluster_uuid, db_repo["name"])
934 deleted_repo_list.append(repo_id)
935
936 # add repo
937 self.log.debug("add repo {}".format(db_repo["name"]))
938 await self.repo_add(
939 cluster_uuid, db_repo["name"], db_repo["url"]
940 )
941 added_repo_dict[repo_id] = db_repo["name"]
942 except Exception as e:
943 raise K8sException(
944 "Error adding repo id: {}, err_msg: {} ".format(
945 repo_id, repr(e)
946 )
947 )
948
949 # Delete repos that are present but not in nbi_list
950 for repo_name in local_repo_dict:
951 if not db_repo_dict.get(repo_name) and repo_name != "stable":
952 self.log.debug("delete repo {}".format(repo_name))
953 try:
954 await self.repo_remove(cluster_uuid, repo_name)
955 deleted_repo_list.append(repo_name)
956 except Exception as e:
957 self.warning(
958 "Error deleting repo, name: {}, err_msg: {}".format(
959 repo_name, str(e)
960 )
961 )
962
963 return deleted_repo_list, added_repo_dict
964
965 except K8sException:
966 raise
967 except Exception as e:
968 # Do not raise errors synchronizing repos
969 self.log.error("Error synchronizing repos: {}".format(e))
970 raise Exception("Error synchronizing repos: {}".format(e))
971
972 def _get_db_repos_dict(self, repo_ids: list):
973 db_repos_dict = {}
974 for repo_id in repo_ids:
975 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
976 db_repos_dict[db_repo["name"]] = db_repo
977 return db_repos_dict
978
979 """
980 ####################################################################################
981 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
982 ####################################################################################
983 """
984
985 @abc.abstractmethod
986 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
987 """
988 Creates and returns base cluster and kube dirs and returns them.
989 Also created helm3 dirs according to new directory specification, paths are
990 not returned but assigned to helm environment variables
991
992 :param cluster_name: cluster_name
993 :return: Dictionary with config_paths and dictionary with helm environment variables
994 """
995
996 @abc.abstractmethod
997 async def _cluster_init(self, cluster_id, namespace, paths, env):
998 """
999 Implements the helm version dependent cluster initialization
1000 """
1001
1002 @abc.abstractmethod
1003 async def _instances_list(self, cluster_id):
1004 """
1005 Implements the helm version dependent helm instances list
1006 """
1007
1008 @abc.abstractmethod
1009 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1010 """
1011 Implements the helm version dependent method to obtain services from a helm instance
1012 """
1013
1014 @abc.abstractmethod
1015 async def _status_kdu(
1016 self,
1017 cluster_id: str,
1018 kdu_instance: str,
1019 namespace: str = None,
1020 yaml_format: bool = False,
1021 show_error_log: bool = False,
1022 ) -> Union[str, dict]:
1023 """
1024 Implements the helm version dependent method to obtain status of a helm instance
1025 """
1026
1027 @abc.abstractmethod
1028 def _get_install_command(
1029 self,
1030 kdu_model,
1031 kdu_instance,
1032 namespace,
1033 params_str,
1034 version,
1035 atomic,
1036 timeout,
1037 kubeconfig,
1038 ) -> str:
1039 """
1040 Obtain command to be executed to delete the indicated instance
1041 """
1042
1043 @abc.abstractmethod
1044 def _get_upgrade_command(
1045 self,
1046 kdu_model,
1047 kdu_instance,
1048 namespace,
1049 params_str,
1050 version,
1051 atomic,
1052 timeout,
1053 kubeconfig,
1054 ) -> str:
1055 """
1056 Obtain command to be executed to upgrade the indicated instance
1057 """
1058
1059 @abc.abstractmethod
1060 def _get_rollback_command(
1061 self, kdu_instance, namespace, revision, kubeconfig
1062 ) -> str:
1063 """
1064 Obtain command to be executed to rollback the indicated instance
1065 """
1066
1067 @abc.abstractmethod
1068 def _get_uninstall_command(
1069 self, kdu_instance: str, namespace: str, kubeconfig: str
1070 ) -> str:
1071 """
1072 Obtain command to be executed to delete the indicated instance
1073 """
1074
1075 @abc.abstractmethod
1076 def _get_inspect_command(
1077 self, show_command: str, kdu_model: str, repo_str: str, version: str
1078 ):
1079 """
1080 Obtain command to be executed to obtain information about the kdu
1081 """
1082
1083 @abc.abstractmethod
1084 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1085 """
1086 Method call to uninstall cluster software for helm. This method is dependent
1087 of helm version
1088 For Helm v2 it will be called when Tiller must be uninstalled
1089 For Helm v3 it does nothing and does not need to be callled
1090 """
1091
1092 @abc.abstractmethod
1093 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1094 """
1095 Obtains the cluster repos identifiers
1096 """
1097
1098 """
1099 ####################################################################################
1100 ################################### P R I V A T E ##################################
1101 ####################################################################################
1102 """
1103
1104 @staticmethod
1105 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1106 if os.path.exists(filename):
1107 return True
1108 else:
1109 msg = "File {} does not exist".format(filename)
1110 if exception_if_not_exists:
1111 raise K8sException(msg)
1112
1113 @staticmethod
1114 def _remove_multiple_spaces(strobj):
1115 strobj = strobj.strip()
1116 while " " in strobj:
1117 strobj = strobj.replace(" ", " ")
1118 return strobj
1119
1120 @staticmethod
1121 def _output_to_lines(output: str) -> list:
1122 output_lines = list()
1123 lines = output.splitlines(keepends=False)
1124 for line in lines:
1125 line = line.strip()
1126 if len(line) > 0:
1127 output_lines.append(line)
1128 return output_lines
1129
1130 @staticmethod
1131 def _output_to_table(output: str) -> list:
1132 output_table = list()
1133 lines = output.splitlines(keepends=False)
1134 for line in lines:
1135 line = line.replace("\t", " ")
1136 line_list = list()
1137 output_table.append(line_list)
1138 cells = line.split(sep=" ")
1139 for cell in cells:
1140 cell = cell.strip()
1141 if len(cell) > 0:
1142 line_list.append(cell)
1143 return output_table
1144
1145 @staticmethod
1146 def _parse_services(output: str) -> list:
1147 lines = output.splitlines(keepends=False)
1148 services = []
1149 for line in lines:
1150 line = line.replace("\t", " ")
1151 cells = line.split(sep=" ")
1152 if len(cells) > 0 and cells[0].startswith("service/"):
1153 elems = cells[0].split(sep="/")
1154 if len(elems) > 1:
1155 services.append(elems[1])
1156 return services
1157
1158 @staticmethod
1159 def _get_deep(dictionary: dict, members: tuple):
1160 target = dictionary
1161 value = None
1162 try:
1163 for m in members:
1164 value = target.get(m)
1165 if not value:
1166 return None
1167 else:
1168 target = value
1169 except Exception:
1170 pass
1171 return value
1172
1173 # find key:value in several lines
1174 @staticmethod
1175 def _find_in_lines(p_lines: list, p_key: str) -> str:
1176 for line in p_lines:
1177 try:
1178 if line.startswith(p_key + ":"):
1179 parts = line.split(":")
1180 the_value = parts[1].strip()
1181 return the_value
1182 except Exception:
1183 # ignore it
1184 pass
1185 return None
1186
1187 @staticmethod
1188 def _lower_keys_list(input_list: list):
1189 """
1190 Transform the keys in a list of dictionaries to lower case and returns a new list
1191 of dictionaries
1192 """
1193 new_list = []
1194 if input_list:
1195 for dictionary in input_list:
1196 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1197 new_list.append(new_dict)
1198 return new_list
1199
1200 async def _local_async_exec(
1201 self,
1202 command: str,
1203 raise_exception_on_error: bool = False,
1204 show_error_log: bool = True,
1205 encode_utf8: bool = False,
1206 env: dict = None,
1207 ) -> (str, int):
1208
1209 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1210 self.log.debug(
1211 "Executing async local command: {}, env: {}".format(command, env)
1212 )
1213
1214 # split command
1215 command = shlex.split(command)
1216
1217 environ = os.environ.copy()
1218 if env:
1219 environ.update(env)
1220
1221 try:
1222 process = await asyncio.create_subprocess_exec(
1223 *command,
1224 stdout=asyncio.subprocess.PIPE,
1225 stderr=asyncio.subprocess.PIPE,
1226 env=environ,
1227 )
1228
1229 # wait for command terminate
1230 stdout, stderr = await process.communicate()
1231
1232 return_code = process.returncode
1233
1234 output = ""
1235 if stdout:
1236 output = stdout.decode("utf-8").strip()
1237 # output = stdout.decode()
1238 if stderr:
1239 output = stderr.decode("utf-8").strip()
1240 # output = stderr.decode()
1241
1242 if return_code != 0 and show_error_log:
1243 self.log.debug(
1244 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1245 )
1246 else:
1247 self.log.debug("Return code: {}".format(return_code))
1248
1249 if raise_exception_on_error and return_code != 0:
1250 raise K8sException(output)
1251
1252 if encode_utf8:
1253 output = output.encode("utf-8").strip()
1254 output = str(output).replace("\\n", "\n")
1255
1256 return output, return_code
1257
1258 except asyncio.CancelledError:
1259 raise
1260 except K8sException:
1261 raise
1262 except Exception as e:
1263 msg = "Exception executing command: {} -> {}".format(command, e)
1264 self.log.error(msg)
1265 if raise_exception_on_error:
1266 raise K8sException(e) from e
1267 else:
1268 return "", -1
1269
1270 async def _local_async_exec_pipe(
1271 self,
1272 command1: str,
1273 command2: str,
1274 raise_exception_on_error: bool = True,
1275 show_error_log: bool = True,
1276 encode_utf8: bool = False,
1277 env: dict = None,
1278 ):
1279
1280 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1281 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1282 command = "{} | {}".format(command1, command2)
1283 self.log.debug(
1284 "Executing async local command: {}, env: {}".format(command, env)
1285 )
1286
1287 # split command
1288 command1 = shlex.split(command1)
1289 command2 = shlex.split(command2)
1290
1291 environ = os.environ.copy()
1292 if env:
1293 environ.update(env)
1294
1295 try:
1296 read, write = os.pipe()
1297 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1298 os.close(write)
1299 process_2 = await asyncio.create_subprocess_exec(
1300 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1301 )
1302 os.close(read)
1303 stdout, stderr = await process_2.communicate()
1304
1305 return_code = process_2.returncode
1306
1307 output = ""
1308 if stdout:
1309 output = stdout.decode("utf-8").strip()
1310 # output = stdout.decode()
1311 if stderr:
1312 output = stderr.decode("utf-8").strip()
1313 # output = stderr.decode()
1314
1315 if return_code != 0 and show_error_log:
1316 self.log.debug(
1317 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1318 )
1319 else:
1320 self.log.debug("Return code: {}".format(return_code))
1321
1322 if raise_exception_on_error and return_code != 0:
1323 raise K8sException(output)
1324
1325 if encode_utf8:
1326 output = output.encode("utf-8").strip()
1327 output = str(output).replace("\\n", "\n")
1328
1329 return output, return_code
1330 except asyncio.CancelledError:
1331 raise
1332 except K8sException:
1333 raise
1334 except Exception as e:
1335 msg = "Exception executing command: {} -> {}".format(command, e)
1336 self.log.error(msg)
1337 if raise_exception_on_error:
1338 raise K8sException(e) from e
1339 else:
1340 return "", -1
1341
1342 async def _get_service(self, cluster_id, service_name, namespace):
1343 """
1344 Obtains the data of the specified service in the k8cluster.
1345
1346 :param cluster_id: id of a K8s cluster known by OSM
1347 :param service_name: name of the K8s service in the specified namespace
1348 :param namespace: K8s namespace used by the KDU instance
1349 :return: If successful, it will return a service with the following data:
1350 - `name` of the service
1351 - `type` type of service in the k8 cluster
1352 - `ports` List of ports offered by the service, for each port includes at least
1353 name, port, protocol
1354 - `cluster_ip` Internal ip to be used inside k8s cluster
1355 - `external_ip` List of external ips (in case they are available)
1356 """
1357
1358 # init config, env
1359 paths, env = self._init_paths_env(
1360 cluster_name=cluster_id, create_if_not_exist=True
1361 )
1362
1363 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1364 self.kubectl_command, paths["kube_config"], namespace, service_name
1365 )
1366
1367 output, _rc = await self._local_async_exec(
1368 command=command, raise_exception_on_error=True, env=env
1369 )
1370
1371 data = yaml.load(output, Loader=yaml.SafeLoader)
1372
1373 service = {
1374 "name": service_name,
1375 "type": self._get_deep(data, ("spec", "type")),
1376 "ports": self._get_deep(data, ("spec", "ports")),
1377 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1378 }
1379 if service["type"] == "LoadBalancer":
1380 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1381 ip_list = [elem["ip"] for elem in ip_map_list]
1382 service["external_ip"] = ip_list
1383
1384 return service
1385
1386 async def _exec_inspect_comand(
1387 self, inspect_command: str, kdu_model: str, repo_url: str = None
1388 ):
1389 """
1390 Obtains information about a kdu, no cluster (no env)
1391 """
1392
1393 repo_str = ""
1394 if repo_url:
1395 repo_str = " --repo {}".format(repo_url)
1396
1397 idx = kdu_model.find("/")
1398 if idx >= 0:
1399 idx += 1
1400 kdu_model = kdu_model[idx:]
1401
1402 version = ""
1403 if ":" in kdu_model:
1404 parts = kdu_model.split(sep=":")
1405 if len(parts) == 2:
1406 version = "--version {}".format(str(parts[1]))
1407 kdu_model = parts[0]
1408
1409 full_command = self._get_inspect_command(
1410 inspect_command, kdu_model, repo_str, version
1411 )
1412 output, _rc = await self._local_async_exec(
1413 command=full_command, encode_utf8=True
1414 )
1415
1416 return output
1417
1418 async def _store_status(
1419 self,
1420 cluster_id: str,
1421 operation: str,
1422 kdu_instance: str,
1423 namespace: str = None,
1424 check_every: float = 10,
1425 db_dict: dict = None,
1426 run_once: bool = False,
1427 ):
1428 while True:
1429 try:
1430 await asyncio.sleep(check_every)
1431 detailed_status = await self._status_kdu(
1432 cluster_id=cluster_id,
1433 kdu_instance=kdu_instance,
1434 yaml_format=False,
1435 namespace=namespace,
1436 )
1437 status = detailed_status.get("info").get("description")
1438 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1439 # write status to db
1440 result = await self.write_app_status_to_db(
1441 db_dict=db_dict,
1442 status=str(status),
1443 detailed_status=str(detailed_status),
1444 operation=operation,
1445 )
1446 if not result:
1447 self.log.info("Error writing in database. Task exiting...")
1448 return
1449 except asyncio.CancelledError:
1450 self.log.debug("Task cancelled")
1451 return
1452 except Exception as e:
1453 self.log.debug(
1454 "_store_status exception: {}".format(str(e)), exc_info=True
1455 )
1456 pass
1457 finally:
1458 if run_once:
1459 return
1460
1461 # params for use in -f file
1462 # returns values file option and filename (in order to delete it at the end)
1463 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1464
1465 if params and len(params) > 0:
1466 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1467
1468 def get_random_number():
1469 r = random.randrange(start=1, stop=99999999)
1470 s = str(r)
1471 while len(s) < 10:
1472 s = "0" + s
1473 return s
1474
1475 params2 = dict()
1476 for key in params:
1477 value = params.get(key)
1478 if "!!yaml" in str(value):
1479 value = yaml.load(value[7:])
1480 params2[key] = value
1481
1482 values_file = get_random_number() + ".yaml"
1483 with open(values_file, "w") as stream:
1484 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1485
1486 return "-f {}".format(values_file), values_file
1487
1488 return "", None
1489
1490 # params for use in --set option
1491 @staticmethod
1492 def _params_to_set_option(params: dict) -> str:
1493 params_str = ""
1494 if params and len(params) > 0:
1495 start = True
1496 for key in params:
1497 value = params.get(key, None)
1498 if value is not None:
1499 if start:
1500 params_str += "--set "
1501 start = False
1502 else:
1503 params_str += ","
1504 params_str += "{}={}".format(key, value)
1505 return params_str
1506
1507 @staticmethod
1508 def generate_kdu_instance_name(**kwargs):
1509 chart_name = kwargs["kdu_model"]
1510 # check embeded chart (file or dir)
1511 if chart_name.startswith("/"):
1512 # extract file or directory name
1513 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1514 # check URL
1515 elif "://" in chart_name:
1516 # extract last portion of URL
1517 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1518
1519 name = ""
1520 for c in chart_name:
1521 if c.isalpha() or c.isnumeric():
1522 name += c
1523 else:
1524 name += "-"
1525 if len(name) > 35:
1526 name = name[0:35]
1527
1528 # if does not start with alpha character, prefix 'a'
1529 if not name[0].isalpha():
1530 name = "a" + name
1531
1532 name += "-"
1533
1534 def get_random_number():
1535 r = random.randrange(start=1, stop=99999999)
1536 s = str(r)
1537 s = s.rjust(10, "0")
1538 return s
1539
1540 name = name + get_random_number()
1541 return name.lower()