Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 @staticmethod
93 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
94 """
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
97 """
98 namespace, _, cluster_id = cluster_uuid.rpartition(":")
99 return namespace, cluster_id
100
101 async def init_env(
102 self,
103 k8s_creds: str,
104 namespace: str = "kube-system",
105 reuse_cluster_uuid=None,
106 **kwargs,
107 ) -> (str, bool):
108 """
109 It prepares a given K8s cluster environment to run Charts
110
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
112 '.kube/config'
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
120 """
121
122 if reuse_cluster_uuid:
123 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
124 namespace = namespace_ or namespace
125 else:
126 cluster_id = str(uuid4())
127 cluster_uuid = "{}:{}".format(namespace, cluster_id)
128
129 self.log.debug(
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
131 )
132
133 paths, env = self._init_paths_env(
134 cluster_name=cluster_id, create_if_not_exist=True
135 )
136 mode = stat.S_IRUSR | stat.S_IWUSR
137 with open(paths["kube_config"], "w", mode) as f:
138 f.write(k8s_creds)
139 os.chmod(paths["kube_config"], 0o600)
140
141 # Code with initialization specific of helm version
142 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
143
144 # sync fs with local data
145 self.fs.reverse_sync(from_path=cluster_id)
146
147 self.log.info("Cluster {} initialized".format(cluster_id))
148
149 return cluster_uuid, n2vc_installed_sw
150
151 async def repo_add(
152 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
153 ):
154 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
155 self.log.debug(
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id, repo_type, name, url
158 )
159 )
160
161 # init_env
162 paths, env = self._init_paths_env(
163 cluster_name=cluster_id, create_if_not_exist=True
164 )
165
166 # sync local dir
167 self.fs.sync(from_path=cluster_id)
168
169 # helm repo update
170 command = "env KUBECONFIG={} {} repo update".format(
171 paths["kube_config"], self._helm_command
172 )
173 self.log.debug("updating repo: {}".format(command))
174 await self._local_async_exec(
175 command=command, raise_exception_on_error=False, env=env
176 )
177
178 # helm repo add name url
179 command = "env KUBECONFIG={} {} repo add {} {}".format(
180 paths["kube_config"], self._helm_command, name, url
181 )
182 self.log.debug("adding repo: {}".format(command))
183 await self._local_async_exec(
184 command=command, raise_exception_on_error=True, env=env
185 )
186
187 # sync fs
188 self.fs.reverse_sync(from_path=cluster_id)
189
190 async def repo_list(self, cluster_uuid: str) -> list:
191 """
192 Get the list of registered repositories
193
194 :return: list of registered repositories: [ (name, url) .... ]
195 """
196
197 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
198 self.log.debug("list repositories for cluster {}".format(cluster_id))
199
200 # config filename
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_id, create_if_not_exist=True
203 )
204
205 # sync local dir
206 self.fs.sync(from_path=cluster_id)
207
208 command = "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths["kube_config"], self._helm_command
210 )
211
212 # Set exception to false because if there are no repos just want an empty list
213 output, _rc = await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
218 self.fs.reverse_sync(from_path=cluster_id)
219
220 if _rc == 0:
221 if output and len(output) > 0:
222 repos = yaml.load(output, Loader=yaml.SafeLoader)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self._lower_keys_list(repos)
225 else:
226 return []
227 else:
228 return []
229
230 async def repo_remove(self, cluster_uuid: str, name: str):
231
232 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
233 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
234
235 # init env, paths
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_id, create_if_not_exist=True
238 )
239
240 # sync local dir
241 self.fs.sync(from_path=cluster_id)
242
243 command = "env KUBECONFIG={} {} repo remove {}".format(
244 paths["kube_config"], self._helm_command, name
245 )
246 await self._local_async_exec(
247 command=command, raise_exception_on_error=True, env=env
248 )
249
250 # sync fs
251 self.fs.reverse_sync(from_path=cluster_id)
252
253 async def reset(
254 self,
255 cluster_uuid: str,
256 force: bool = False,
257 uninstall_sw: bool = False,
258 **kwargs,
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
263
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
269 """
270 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
271 self.log.debug(
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_id, uninstall_sw
274 )
275 )
276
277 # sync local dir
278 self.fs.sync(from_path=cluster_id)
279
280 # uninstall releases if needed.
281 if uninstall_sw:
282 releases = await self.instances_list(cluster_uuid=cluster_uuid)
283 if len(releases) > 0:
284 if force:
285 for r in releases:
286 try:
287 kdu_instance = r.get("name")
288 chart = r.get("chart")
289 self.log.debug(
290 "Uninstalling {} -> {}".format(chart, kdu_instance)
291 )
292 await self.uninstall(
293 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
294 )
295 except Exception as e:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
298 # raised an error
299 self.log.warn(
300 "Error uninstalling release {}: {}".format(
301 kdu_instance, e
302 )
303 )
304 else:
305 msg = (
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_id)
308 self.log.warn(msg)
309 uninstall_sw = (
310 False # Allow to remove k8s cluster without removing Tiller
311 )
312
313 if uninstall_sw:
314 await self._uninstall_sw(cluster_id, namespace)
315
316 # delete cluster directory
317 self.log.debug("Removing directory {}".format(cluster_id))
318 self.fs.file_delete(cluster_id, ignore_non_exist=True)
319 # Remove also local directorio if still exist
320 direct = self.fs.path + "/" + cluster_id
321 shutil.rmtree(direct, ignore_errors=True)
322
323 return True
324
325 async def _install_impl(
326 self,
327 cluster_id: str,
328 kdu_model: str,
329 paths: dict,
330 env: dict,
331 kdu_instance: str,
332 atomic: bool = True,
333 timeout: float = 300,
334 params: dict = None,
335 db_dict: dict = None,
336 kdu_name: str = None,
337 namespace: str = None,
338 ):
339 # init env, paths
340 paths, env = self._init_paths_env(
341 cluster_name=cluster_id, create_if_not_exist=True
342 )
343
344 # params to str
345 params_str, file_to_delete = self._params_to_file_option(
346 cluster_id=cluster_id, params=params
347 )
348
349 # version
350 version = None
351 if ":" in kdu_model:
352 parts = kdu_model.split(sep=":")
353 if len(parts) == 2:
354 version = str(parts[1])
355 kdu_model = parts[0]
356
357 command = self._get_install_command(
358 kdu_model,
359 kdu_instance,
360 namespace,
361 params_str,
362 version,
363 atomic,
364 timeout,
365 paths["kube_config"],
366 )
367
368 self.log.debug("installing: {}".format(command))
369
370 if atomic:
371 # exec helm in a task
372 exec_task = asyncio.ensure_future(
373 coro_or_future=self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376 )
377
378 # write status in another task
379 status_task = asyncio.ensure_future(
380 coro_or_future=self._store_status(
381 cluster_id=cluster_id,
382 kdu_instance=kdu_instance,
383 namespace=namespace,
384 db_dict=db_dict,
385 operation="install",
386 run_once=False,
387 )
388 )
389
390 # wait for execution task
391 await asyncio.wait([exec_task])
392
393 # cancel status task
394 status_task.cancel()
395
396 output, rc = exec_task.result()
397
398 else:
399
400 output, rc = await self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403
404 # remove temporal values yaml file
405 if file_to_delete:
406 os.remove(file_to_delete)
407
408 # write final status
409 await self._store_status(
410 cluster_id=cluster_id,
411 kdu_instance=kdu_instance,
412 namespace=namespace,
413 db_dict=db_dict,
414 operation="install",
415 run_once=True,
416 check_every=0,
417 )
418
419 if rc != 0:
420 msg = "Error executing command: {}\nOutput: {}".format(command, output)
421 self.log.error(msg)
422 raise K8sException(msg)
423
424 async def upgrade(
425 self,
426 cluster_uuid: str,
427 kdu_instance: str,
428 kdu_model: str = None,
429 atomic: bool = True,
430 timeout: float = 300,
431 params: dict = None,
432 db_dict: dict = None,
433 ):
434 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
435 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
436
437 # sync local dir
438 self.fs.sync(from_path=cluster_id)
439
440 # look for instance to obtain namespace
441 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
442 if not instance_info:
443 raise K8sException("kdu_instance {} not found".format(kdu_instance))
444
445 # init env, paths
446 paths, env = self._init_paths_env(
447 cluster_name=cluster_id, create_if_not_exist=True
448 )
449
450 # sync local dir
451 self.fs.sync(from_path=cluster_id)
452
453 # params to str
454 params_str, file_to_delete = self._params_to_file_option(
455 cluster_id=cluster_id, params=params
456 )
457
458 # version
459 version = None
460 if ":" in kdu_model:
461 parts = kdu_model.split(sep=":")
462 if len(parts) == 2:
463 version = str(parts[1])
464 kdu_model = parts[0]
465
466 command = self._get_upgrade_command(
467 kdu_model,
468 kdu_instance,
469 instance_info["namespace"],
470 params_str,
471 version,
472 atomic,
473 timeout,
474 paths["kube_config"],
475 )
476
477 self.log.debug("upgrading: {}".format(command))
478
479 if atomic:
480
481 # exec helm in a task
482 exec_task = asyncio.ensure_future(
483 coro_or_future=self._local_async_exec(
484 command=command, raise_exception_on_error=False, env=env
485 )
486 )
487 # write status in another task
488 status_task = asyncio.ensure_future(
489 coro_or_future=self._store_status(
490 cluster_id=cluster_id,
491 kdu_instance=kdu_instance,
492 namespace=instance_info["namespace"],
493 db_dict=db_dict,
494 operation="upgrade",
495 run_once=False,
496 )
497 )
498
499 # wait for execution task
500 await asyncio.wait([exec_task])
501
502 # cancel status task
503 status_task.cancel()
504 output, rc = exec_task.result()
505
506 else:
507
508 output, rc = await self._local_async_exec(
509 command=command, raise_exception_on_error=False, env=env
510 )
511
512 # remove temporal values yaml file
513 if file_to_delete:
514 os.remove(file_to_delete)
515
516 # write final status
517 await self._store_status(
518 cluster_id=cluster_id,
519 kdu_instance=kdu_instance,
520 namespace=instance_info["namespace"],
521 db_dict=db_dict,
522 operation="upgrade",
523 run_once=True,
524 check_every=0,
525 )
526
527 if rc != 0:
528 msg = "Error executing command: {}\nOutput: {}".format(command, output)
529 self.log.error(msg)
530 raise K8sException(msg)
531
532 # sync fs
533 self.fs.reverse_sync(from_path=cluster_id)
534
535 # return new revision number
536 instance = await self.get_instance_info(
537 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
538 )
539 if instance:
540 revision = int(instance.get("revision"))
541 self.log.debug("New revision: {}".format(revision))
542 return revision
543 else:
544 return 0
545
546 async def scale(
547 self,
548 kdu_instance: str,
549 scale: int,
550 resource_name: str,
551 total_timeout: float = 1800,
552 **kwargs,
553 ):
554 raise NotImplementedError("Method not implemented")
555
556 async def get_scale_count(
557 self,
558 resource_name: str,
559 kdu_instance: str,
560 **kwargs,
561 ):
562 raise NotImplementedError("Method not implemented")
563
564 async def rollback(
565 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
566 ):
567
568 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
569 self.log.debug(
570 "rollback kdu_instance {} to revision {} from cluster {}".format(
571 kdu_instance, revision, cluster_id
572 )
573 )
574
575 # sync local dir
576 self.fs.sync(from_path=cluster_id)
577
578 # look for instance to obtain namespace
579 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
580 if not instance_info:
581 raise K8sException("kdu_instance {} not found".format(kdu_instance))
582
583 # init env, paths
584 paths, env = self._init_paths_env(
585 cluster_name=cluster_id, create_if_not_exist=True
586 )
587
588 # sync local dir
589 self.fs.sync(from_path=cluster_id)
590
591 command = self._get_rollback_command(
592 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
593 )
594
595 self.log.debug("rolling_back: {}".format(command))
596
597 # exec helm in a task
598 exec_task = asyncio.ensure_future(
599 coro_or_future=self._local_async_exec(
600 command=command, raise_exception_on_error=False, env=env
601 )
602 )
603 # write status in another task
604 status_task = asyncio.ensure_future(
605 coro_or_future=self._store_status(
606 cluster_id=cluster_id,
607 kdu_instance=kdu_instance,
608 namespace=instance_info["namespace"],
609 db_dict=db_dict,
610 operation="rollback",
611 run_once=False,
612 )
613 )
614
615 # wait for execution task
616 await asyncio.wait([exec_task])
617
618 # cancel status task
619 status_task.cancel()
620
621 output, rc = exec_task.result()
622
623 # write final status
624 await self._store_status(
625 cluster_id=cluster_id,
626 kdu_instance=kdu_instance,
627 namespace=instance_info["namespace"],
628 db_dict=db_dict,
629 operation="rollback",
630 run_once=True,
631 check_every=0,
632 )
633
634 if rc != 0:
635 msg = "Error executing command: {}\nOutput: {}".format(command, output)
636 self.log.error(msg)
637 raise K8sException(msg)
638
639 # sync fs
640 self.fs.reverse_sync(from_path=cluster_id)
641
642 # return new revision number
643 instance = await self.get_instance_info(
644 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
645 )
646 if instance:
647 revision = int(instance.get("revision"))
648 self.log.debug("New revision: {}".format(revision))
649 return revision
650 else:
651 return 0
652
653 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
654 """
655 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
656 (this call should happen after all _terminate-config-primitive_ of the VNF
657 are invoked).
658
659 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
660 :param kdu_instance: unique name for the KDU instance to be deleted
661 :param kwargs: Additional parameters (None yet)
662 :return: True if successful
663 """
664
665 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
666 self.log.debug(
667 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
668 )
669
670 # sync local dir
671 self.fs.sync(from_path=cluster_id)
672
673 # look for instance to obtain namespace
674 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
675 if not instance_info:
676 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
677 return True
678 # init env, paths
679 paths, env = self._init_paths_env(
680 cluster_name=cluster_id, create_if_not_exist=True
681 )
682
683 # sync local dir
684 self.fs.sync(from_path=cluster_id)
685
686 command = self._get_uninstall_command(
687 kdu_instance, instance_info["namespace"], paths["kube_config"]
688 )
689 output, _rc = await self._local_async_exec(
690 command=command, raise_exception_on_error=True, env=env
691 )
692
693 # sync fs
694 self.fs.reverse_sync(from_path=cluster_id)
695
696 return self._output_to_table(output)
697
698 async def instances_list(self, cluster_uuid: str) -> list:
699 """
700 returns a list of deployed releases in a cluster
701
702 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
703 :return:
704 """
705
706 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
707 self.log.debug("list releases for cluster {}".format(cluster_id))
708
709 # sync local dir
710 self.fs.sync(from_path=cluster_id)
711
712 # execute internal command
713 result = await self._instances_list(cluster_id)
714
715 # sync fs
716 self.fs.reverse_sync(from_path=cluster_id)
717
718 return result
719
720 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
721 instances = await self.instances_list(cluster_uuid=cluster_uuid)
722 for instance in instances:
723 if instance.get("name") == kdu_instance:
724 return instance
725 self.log.debug("Instance {} not found".format(kdu_instance))
726 return None
727
728 async def exec_primitive(
729 self,
730 cluster_uuid: str = None,
731 kdu_instance: str = None,
732 primitive_name: str = None,
733 timeout: float = 300,
734 params: dict = None,
735 db_dict: dict = None,
736 **kwargs,
737 ) -> str:
738 """Exec primitive (Juju action)
739
740 :param cluster_uuid: The UUID of the cluster or namespace:cluster
741 :param kdu_instance: The unique name of the KDU instance
742 :param primitive_name: Name of action that will be executed
743 :param timeout: Timeout for action execution
744 :param params: Dictionary of all the parameters needed for the action
745 :db_dict: Dictionary for any additional data
746 :param kwargs: Additional parameters (None yet)
747
748 :return: Returns the output of the action
749 """
750 raise K8sException(
751 "KDUs deployed with Helm don't support actions "
752 "different from rollback, upgrade and status"
753 )
754
755 async def get_services(
756 self, cluster_uuid: str, kdu_instance: str, namespace: str
757 ) -> list:
758 """
759 Returns a list of services defined for the specified kdu instance.
760
761 :param cluster_uuid: UUID of a K8s cluster known by OSM
762 :param kdu_instance: unique name for the KDU instance
763 :param namespace: K8s namespace used by the KDU instance
764 :return: If successful, it will return a list of services, Each service
765 can have the following data:
766 - `name` of the service
767 - `type` type of service in the k8 cluster
768 - `ports` List of ports offered by the service, for each port includes at least
769 name, port, protocol
770 - `cluster_ip` Internal ip to be used inside k8s cluster
771 - `external_ip` List of external ips (in case they are available)
772 """
773
774 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
775 self.log.debug(
776 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
777 cluster_uuid, kdu_instance
778 )
779 )
780
781 # init env, paths
782 paths, env = self._init_paths_env(
783 cluster_name=cluster_id, create_if_not_exist=True
784 )
785
786 # sync local dir
787 self.fs.sync(from_path=cluster_id)
788
789 # get list of services names for kdu
790 service_names = await self._get_services(
791 cluster_id, kdu_instance, namespace, paths["kube_config"]
792 )
793
794 service_list = []
795 for service in service_names:
796 service = await self._get_service(cluster_id, service, namespace)
797 service_list.append(service)
798
799 # sync fs
800 self.fs.reverse_sync(from_path=cluster_id)
801
802 return service_list
803
804 async def get_service(
805 self, cluster_uuid: str, service_name: str, namespace: str
806 ) -> object:
807
808 self.log.debug(
809 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
810 service_name, namespace, cluster_uuid
811 )
812 )
813
814 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
815
816 # sync local dir
817 self.fs.sync(from_path=cluster_id)
818
819 service = await self._get_service(cluster_id, service_name, namespace)
820
821 # sync fs
822 self.fs.reverse_sync(from_path=cluster_id)
823
824 return service
825
826 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
827 """
828 This call would retrieve tha current state of a given KDU instance. It would be
829 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
830 values_ of the configuration parameters applied to a given instance. This call
831 would be based on the `status` call.
832
833 :param cluster_uuid: UUID of a K8s cluster known by OSM
834 :param kdu_instance: unique name for the KDU instance
835 :param kwargs: Additional parameters (None yet)
836 :return: If successful, it will return the following vector of arguments:
837 - K8s `namespace` in the cluster where the KDU lives
838 - `state` of the KDU instance. It can be:
839 - UNKNOWN
840 - DEPLOYED
841 - DELETED
842 - SUPERSEDED
843 - FAILED or
844 - DELETING
845 - List of `resources` (objects) that this release consists of, sorted by kind,
846 and the status of those resources
847 - Last `deployment_time`.
848
849 """
850 self.log.debug(
851 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
852 cluster_uuid, kdu_instance
853 )
854 )
855
856 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
857
858 # sync local dir
859 self.fs.sync(from_path=cluster_id)
860
861 # get instance: needed to obtain namespace
862 instances = await self._instances_list(cluster_id=cluster_id)
863 for instance in instances:
864 if instance.get("name") == kdu_instance:
865 break
866 else:
867 # instance does not exist
868 raise K8sException(
869 "Instance name: {} not found in cluster: {}".format(
870 kdu_instance, cluster_id
871 )
872 )
873
874 status = await self._status_kdu(
875 cluster_id=cluster_id,
876 kdu_instance=kdu_instance,
877 namespace=instance["namespace"],
878 show_error_log=True,
879 return_text=True,
880 )
881
882 # sync fs
883 self.fs.reverse_sync(from_path=cluster_id)
884
885 return status
886
887 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
888
889 self.log.debug(
890 "inspect kdu_model values {} from (optional) repo: {}".format(
891 kdu_model, repo_url
892 )
893 )
894
895 return await self._exec_inspect_comand(
896 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
897 )
898
899 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
900
901 self.log.debug(
902 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
903 )
904
905 return await self._exec_inspect_comand(
906 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
907 )
908
909 async def synchronize_repos(self, cluster_uuid: str):
910
911 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
912 try:
913 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
914 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
915
916 local_repo_list = await self.repo_list(cluster_uuid)
917 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
918
919 deleted_repo_list = []
920 added_repo_dict = {}
921
922 # iterate over the list of repos in the database that should be
923 # added if not present
924 for repo_name, db_repo in db_repo_dict.items():
925 try:
926 # check if it is already present
927 curr_repo_url = local_repo_dict.get(db_repo["name"])
928 repo_id = db_repo.get("_id")
929 if curr_repo_url != db_repo["url"]:
930 if curr_repo_url:
931 self.log.debug(
932 "repo {} url changed, delete and and again".format(
933 db_repo["url"]
934 )
935 )
936 await self.repo_remove(cluster_uuid, db_repo["name"])
937 deleted_repo_list.append(repo_id)
938
939 # add repo
940 self.log.debug("add repo {}".format(db_repo["name"]))
941 await self.repo_add(
942 cluster_uuid, db_repo["name"], db_repo["url"]
943 )
944 added_repo_dict[repo_id] = db_repo["name"]
945 except Exception as e:
946 raise K8sException(
947 "Error adding repo id: {}, err_msg: {} ".format(
948 repo_id, repr(e)
949 )
950 )
951
952 # Delete repos that are present but not in nbi_list
953 for repo_name in local_repo_dict:
954 if not db_repo_dict.get(repo_name) and repo_name != "stable":
955 self.log.debug("delete repo {}".format(repo_name))
956 try:
957 await self.repo_remove(cluster_uuid, repo_name)
958 deleted_repo_list.append(repo_name)
959 except Exception as e:
960 self.warning(
961 "Error deleting repo, name: {}, err_msg: {}".format(
962 repo_name, str(e)
963 )
964 )
965
966 return deleted_repo_list, added_repo_dict
967
968 except K8sException:
969 raise
970 except Exception as e:
971 # Do not raise errors synchronizing repos
972 self.log.error("Error synchronizing repos: {}".format(e))
973 raise Exception("Error synchronizing repos: {}".format(e))
974
975 def _get_db_repos_dict(self, repo_ids: list):
976 db_repos_dict = {}
977 for repo_id in repo_ids:
978 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
979 db_repos_dict[db_repo["name"]] = db_repo
980 return db_repos_dict
981
982 """
983 ####################################################################################
984 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
985 ####################################################################################
986 """
987
988 @abc.abstractmethod
989 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
990 """
991 Creates and returns base cluster and kube dirs and returns them.
992 Also created helm3 dirs according to new directory specification, paths are
993 not returned but assigned to helm environment variables
994
995 :param cluster_name: cluster_name
996 :return: Dictionary with config_paths and dictionary with helm environment variables
997 """
998
999 @abc.abstractmethod
1000 async def _cluster_init(self, cluster_id, namespace, paths, env):
1001 """
1002 Implements the helm version dependent cluster initialization
1003 """
1004
1005 @abc.abstractmethod
1006 async def _instances_list(self, cluster_id):
1007 """
1008 Implements the helm version dependent helm instances list
1009 """
1010
1011 @abc.abstractmethod
1012 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1013 """
1014 Implements the helm version dependent method to obtain services from a helm instance
1015 """
1016
1017 @abc.abstractmethod
1018 async def _status_kdu(
1019 self,
1020 cluster_id: str,
1021 kdu_instance: str,
1022 namespace: str = None,
1023 show_error_log: bool = False,
1024 return_text: bool = False,
1025 ):
1026 """
1027 Implements the helm version dependent method to obtain status of a helm instance
1028 """
1029
1030 @abc.abstractmethod
1031 def _get_install_command(
1032 self,
1033 kdu_model,
1034 kdu_instance,
1035 namespace,
1036 params_str,
1037 version,
1038 atomic,
1039 timeout,
1040 kubeconfig,
1041 ) -> str:
1042 """
1043 Obtain command to be executed to delete the indicated instance
1044 """
1045
1046 @abc.abstractmethod
1047 def _get_upgrade_command(
1048 self,
1049 kdu_model,
1050 kdu_instance,
1051 namespace,
1052 params_str,
1053 version,
1054 atomic,
1055 timeout,
1056 kubeconfig,
1057 ) -> str:
1058 """
1059 Obtain command to be executed to upgrade the indicated instance
1060 """
1061
1062 @abc.abstractmethod
1063 def _get_rollback_command(
1064 self, kdu_instance, namespace, revision, kubeconfig
1065 ) -> str:
1066 """
1067 Obtain command to be executed to rollback the indicated instance
1068 """
1069
1070 @abc.abstractmethod
1071 def _get_uninstall_command(
1072 self, kdu_instance: str, namespace: str, kubeconfig: str
1073 ) -> str:
1074 """
1075 Obtain command to be executed to delete the indicated instance
1076 """
1077
1078 @abc.abstractmethod
1079 def _get_inspect_command(
1080 self, show_command: str, kdu_model: str, repo_str: str, version: str
1081 ):
1082 """
1083 Obtain command to be executed to obtain information about the kdu
1084 """
1085
1086 @abc.abstractmethod
1087 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1088 """
1089 Method call to uninstall cluster software for helm. This method is dependent
1090 of helm version
1091 For Helm v2 it will be called when Tiller must be uninstalled
1092 For Helm v3 it does nothing and does not need to be callled
1093 """
1094
1095 @abc.abstractmethod
1096 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1097 """
1098 Obtains the cluster repos identifiers
1099 """
1100
1101 """
1102 ####################################################################################
1103 ################################### P R I V A T E ##################################
1104 ####################################################################################
1105 """
1106
1107 @staticmethod
1108 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1109 if os.path.exists(filename):
1110 return True
1111 else:
1112 msg = "File {} does not exist".format(filename)
1113 if exception_if_not_exists:
1114 raise K8sException(msg)
1115
1116 @staticmethod
1117 def _remove_multiple_spaces(strobj):
1118 strobj = strobj.strip()
1119 while " " in strobj:
1120 strobj = strobj.replace(" ", " ")
1121 return strobj
1122
1123 @staticmethod
1124 def _output_to_lines(output: str) -> list:
1125 output_lines = list()
1126 lines = output.splitlines(keepends=False)
1127 for line in lines:
1128 line = line.strip()
1129 if len(line) > 0:
1130 output_lines.append(line)
1131 return output_lines
1132
1133 @staticmethod
1134 def _output_to_table(output: str) -> list:
1135 output_table = list()
1136 lines = output.splitlines(keepends=False)
1137 for line in lines:
1138 line = line.replace("\t", " ")
1139 line_list = list()
1140 output_table.append(line_list)
1141 cells = line.split(sep=" ")
1142 for cell in cells:
1143 cell = cell.strip()
1144 if len(cell) > 0:
1145 line_list.append(cell)
1146 return output_table
1147
1148 @staticmethod
1149 def _parse_services(output: str) -> list:
1150 lines = output.splitlines(keepends=False)
1151 services = []
1152 for line in lines:
1153 line = line.replace("\t", " ")
1154 cells = line.split(sep=" ")
1155 if len(cells) > 0 and cells[0].startswith("service/"):
1156 elems = cells[0].split(sep="/")
1157 if len(elems) > 1:
1158 services.append(elems[1])
1159 return services
1160
1161 @staticmethod
1162 def _get_deep(dictionary: dict, members: tuple):
1163 target = dictionary
1164 value = None
1165 try:
1166 for m in members:
1167 value = target.get(m)
1168 if not value:
1169 return None
1170 else:
1171 target = value
1172 except Exception:
1173 pass
1174 return value
1175
1176 # find key:value in several lines
1177 @staticmethod
1178 def _find_in_lines(p_lines: list, p_key: str) -> str:
1179 for line in p_lines:
1180 try:
1181 if line.startswith(p_key + ":"):
1182 parts = line.split(":")
1183 the_value = parts[1].strip()
1184 return the_value
1185 except Exception:
1186 # ignore it
1187 pass
1188 return None
1189
1190 @staticmethod
1191 def _lower_keys_list(input_list: list):
1192 """
1193 Transform the keys in a list of dictionaries to lower case and returns a new list
1194 of dictionaries
1195 """
1196 new_list = []
1197 if input_list:
1198 for dictionary in input_list:
1199 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1200 new_list.append(new_dict)
1201 return new_list
1202
1203 async def _local_async_exec(
1204 self,
1205 command: str,
1206 raise_exception_on_error: bool = False,
1207 show_error_log: bool = True,
1208 encode_utf8: bool = False,
1209 env: dict = None,
1210 ) -> (str, int):
1211
1212 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1213 self.log.debug(
1214 "Executing async local command: {}, env: {}".format(command, env)
1215 )
1216
1217 # split command
1218 command = shlex.split(command)
1219
1220 environ = os.environ.copy()
1221 if env:
1222 environ.update(env)
1223
1224 try:
1225 process = await asyncio.create_subprocess_exec(
1226 *command,
1227 stdout=asyncio.subprocess.PIPE,
1228 stderr=asyncio.subprocess.PIPE,
1229 env=environ,
1230 )
1231
1232 # wait for command terminate
1233 stdout, stderr = await process.communicate()
1234
1235 return_code = process.returncode
1236
1237 output = ""
1238 if stdout:
1239 output = stdout.decode("utf-8").strip()
1240 # output = stdout.decode()
1241 if stderr:
1242 output = stderr.decode("utf-8").strip()
1243 # output = stderr.decode()
1244
1245 if return_code != 0 and show_error_log:
1246 self.log.debug(
1247 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1248 )
1249 else:
1250 self.log.debug("Return code: {}".format(return_code))
1251
1252 if raise_exception_on_error and return_code != 0:
1253 raise K8sException(output)
1254
1255 if encode_utf8:
1256 output = output.encode("utf-8").strip()
1257 output = str(output).replace("\\n", "\n")
1258
1259 return output, return_code
1260
1261 except asyncio.CancelledError:
1262 raise
1263 except K8sException:
1264 raise
1265 except Exception as e:
1266 msg = "Exception executing command: {} -> {}".format(command, e)
1267 self.log.error(msg)
1268 if raise_exception_on_error:
1269 raise K8sException(e) from e
1270 else:
1271 return "", -1
1272
1273 async def _local_async_exec_pipe(
1274 self,
1275 command1: str,
1276 command2: str,
1277 raise_exception_on_error: bool = True,
1278 show_error_log: bool = True,
1279 encode_utf8: bool = False,
1280 env: dict = None,
1281 ):
1282
1283 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1284 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1285 command = "{} | {}".format(command1, command2)
1286 self.log.debug(
1287 "Executing async local command: {}, env: {}".format(command, env)
1288 )
1289
1290 # split command
1291 command1 = shlex.split(command1)
1292 command2 = shlex.split(command2)
1293
1294 environ = os.environ.copy()
1295 if env:
1296 environ.update(env)
1297
1298 try:
1299 read, write = os.pipe()
1300 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1301 os.close(write)
1302 process_2 = await asyncio.create_subprocess_exec(
1303 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1304 )
1305 os.close(read)
1306 stdout, stderr = await process_2.communicate()
1307
1308 return_code = process_2.returncode
1309
1310 output = ""
1311 if stdout:
1312 output = stdout.decode("utf-8").strip()
1313 # output = stdout.decode()
1314 if stderr:
1315 output = stderr.decode("utf-8").strip()
1316 # output = stderr.decode()
1317
1318 if return_code != 0 and show_error_log:
1319 self.log.debug(
1320 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1321 )
1322 else:
1323 self.log.debug("Return code: {}".format(return_code))
1324
1325 if raise_exception_on_error and return_code != 0:
1326 raise K8sException(output)
1327
1328 if encode_utf8:
1329 output = output.encode("utf-8").strip()
1330 output = str(output).replace("\\n", "\n")
1331
1332 return output, return_code
1333 except asyncio.CancelledError:
1334 raise
1335 except K8sException:
1336 raise
1337 except Exception as e:
1338 msg = "Exception executing command: {} -> {}".format(command, e)
1339 self.log.error(msg)
1340 if raise_exception_on_error:
1341 raise K8sException(e) from e
1342 else:
1343 return "", -1
1344
1345 async def _get_service(self, cluster_id, service_name, namespace):
1346 """
1347 Obtains the data of the specified service in the k8cluster.
1348
1349 :param cluster_id: id of a K8s cluster known by OSM
1350 :param service_name: name of the K8s service in the specified namespace
1351 :param namespace: K8s namespace used by the KDU instance
1352 :return: If successful, it will return a service with the following data:
1353 - `name` of the service
1354 - `type` type of service in the k8 cluster
1355 - `ports` List of ports offered by the service, for each port includes at least
1356 name, port, protocol
1357 - `cluster_ip` Internal ip to be used inside k8s cluster
1358 - `external_ip` List of external ips (in case they are available)
1359 """
1360
1361 # init config, env
1362 paths, env = self._init_paths_env(
1363 cluster_name=cluster_id, create_if_not_exist=True
1364 )
1365
1366 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1367 self.kubectl_command, paths["kube_config"], namespace, service_name
1368 )
1369
1370 output, _rc = await self._local_async_exec(
1371 command=command, raise_exception_on_error=True, env=env
1372 )
1373
1374 data = yaml.load(output, Loader=yaml.SafeLoader)
1375
1376 service = {
1377 "name": service_name,
1378 "type": self._get_deep(data, ("spec", "type")),
1379 "ports": self._get_deep(data, ("spec", "ports")),
1380 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1381 }
1382 if service["type"] == "LoadBalancer":
1383 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1384 ip_list = [elem["ip"] for elem in ip_map_list]
1385 service["external_ip"] = ip_list
1386
1387 return service
1388
1389 async def _exec_inspect_comand(
1390 self, inspect_command: str, kdu_model: str, repo_url: str = None
1391 ):
1392 """
1393 Obtains information about a kdu, no cluster (no env)
1394 """
1395
1396 repo_str = ""
1397 if repo_url:
1398 repo_str = " --repo {}".format(repo_url)
1399
1400 idx = kdu_model.find("/")
1401 if idx >= 0:
1402 idx += 1
1403 kdu_model = kdu_model[idx:]
1404
1405 version = ""
1406 if ":" in kdu_model:
1407 parts = kdu_model.split(sep=":")
1408 if len(parts) == 2:
1409 version = "--version {}".format(str(parts[1]))
1410 kdu_model = parts[0]
1411
1412 full_command = self._get_inspect_command(
1413 inspect_command, kdu_model, repo_str, version
1414 )
1415 output, _rc = await self._local_async_exec(
1416 command=full_command, encode_utf8=True
1417 )
1418
1419 return output
1420
1421 async def _store_status(
1422 self,
1423 cluster_id: str,
1424 operation: str,
1425 kdu_instance: str,
1426 namespace: str = None,
1427 check_every: float = 10,
1428 db_dict: dict = None,
1429 run_once: bool = False,
1430 ):
1431 while True:
1432 try:
1433 await asyncio.sleep(check_every)
1434 detailed_status = await self._status_kdu(
1435 cluster_id=cluster_id,
1436 kdu_instance=kdu_instance,
1437 namespace=namespace,
1438 return_text=False,
1439 )
1440 status = detailed_status.get("info").get("description")
1441 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1442 # write status to db
1443 result = await self.write_app_status_to_db(
1444 db_dict=db_dict,
1445 status=str(status),
1446 detailed_status=str(detailed_status),
1447 operation=operation,
1448 )
1449 if not result:
1450 self.log.info("Error writing in database. Task exiting...")
1451 return
1452 except asyncio.CancelledError:
1453 self.log.debug("Task cancelled")
1454 return
1455 except Exception as e:
1456 self.log.debug(
1457 "_store_status exception: {}".format(str(e)), exc_info=True
1458 )
1459 pass
1460 finally:
1461 if run_once:
1462 return
1463
1464 # params for use in -f file
1465 # returns values file option and filename (in order to delete it at the end)
1466 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1467
1468 if params and len(params) > 0:
1469 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1470
1471 def get_random_number():
1472 r = random.randrange(start=1, stop=99999999)
1473 s = str(r)
1474 while len(s) < 10:
1475 s = "0" + s
1476 return s
1477
1478 params2 = dict()
1479 for key in params:
1480 value = params.get(key)
1481 if "!!yaml" in str(value):
1482 value = yaml.load(value[7:])
1483 params2[key] = value
1484
1485 values_file = get_random_number() + ".yaml"
1486 with open(values_file, "w") as stream:
1487 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1488
1489 return "-f {}".format(values_file), values_file
1490
1491 return "", None
1492
1493 # params for use in --set option
1494 @staticmethod
1495 def _params_to_set_option(params: dict) -> str:
1496 params_str = ""
1497 if params and len(params) > 0:
1498 start = True
1499 for key in params:
1500 value = params.get(key, None)
1501 if value is not None:
1502 if start:
1503 params_str += "--set "
1504 start = False
1505 else:
1506 params_str += ","
1507 params_str += "{}={}".format(key, value)
1508 return params_str
1509
1510 @staticmethod
1511 def generate_kdu_instance_name(**kwargs):
1512 chart_name = kwargs["kdu_model"]
1513 # check embeded chart (file or dir)
1514 if chart_name.startswith("/"):
1515 # extract file or directory name
1516 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1517 # check URL
1518 elif "://" in chart_name:
1519 # extract last portion of URL
1520 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1521
1522 name = ""
1523 for c in chart_name:
1524 if c.isalpha() or c.isnumeric():
1525 name += c
1526 else:
1527 name += "-"
1528 if len(name) > 35:
1529 name = name[0:35]
1530
1531 # if does not start with alpha character, prefix 'a'
1532 if not name[0].isalpha():
1533 name = "a" + name
1534
1535 name += "-"
1536
1537 def get_random_number():
1538 r = random.randrange(start=1, stop=99999999)
1539 s = str(r)
1540 s = s.rjust(10, "0")
1541 return s
1542
1543 name = name + get_random_number()
1544 return name.lower()