Fix black issues
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
101
102 async def init_env(
103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
126 else:
127 cluster_id = str(uuid4())
128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
129
130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
150 return cluster_uuid, n2vc_installed_sw
151
152 async def repo_add(
153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
154 ):
155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_id, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_id)
169
170 # helm repo add name url
171 command = "env KUBECONFIG={} {} repo add {} {}".format(
172 paths["kube_config"], self._helm_command, name, url
173 )
174 self.log.debug("adding repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=True, env=env
177 )
178
179 # helm repo update
180 command = "env KUBECONFIG={} {} repo update {}".format(
181 paths["kube_config"], self._helm_command, name
182 )
183 self.log.debug("updating repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=False, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_id)
190
191 async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"):
192 self.log.debug(
193 "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name)
194 )
195
196 # init_env
197 paths, env = self._init_paths_env(
198 cluster_name=cluster_id, create_if_not_exist=True
199 )
200
201 # sync local dir
202 self.fs.sync(from_path=cluster_id)
203
204 # helm repo update
205 command = "{} repo update {}".format(self._helm_command, name)
206 self.log.debug("updating repo: {}".format(command))
207 await self._local_async_exec(
208 command=command, raise_exception_on_error=False, env=env
209 )
210
211 # sync fs
212 self.fs.reverse_sync(from_path=cluster_id)
213
214 async def repo_list(self, cluster_uuid: str) -> list:
215 """
216 Get the list of registered repositories
217
218 :return: list of registered repositories: [ (name, url) .... ]
219 """
220
221 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
222 self.log.debug("list repositories for cluster {}".format(cluster_id))
223
224 # config filename
225 paths, env = self._init_paths_env(
226 cluster_name=cluster_id, create_if_not_exist=True
227 )
228
229 # sync local dir
230 self.fs.sync(from_path=cluster_id)
231
232 command = "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths["kube_config"], self._helm_command
234 )
235
236 # Set exception to false because if there are no repos just want an empty list
237 output, _rc = await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_id)
243
244 if _rc == 0:
245 if output and len(output) > 0:
246 repos = yaml.load(output, Loader=yaml.SafeLoader)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self._lower_keys_list(repos)
249 else:
250 return []
251 else:
252 return []
253
254 async def repo_remove(self, cluster_uuid: str, name: str):
255 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
256 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
257
258 # init env, paths
259 paths, env = self._init_paths_env(
260 cluster_name=cluster_id, create_if_not_exist=True
261 )
262
263 # sync local dir
264 self.fs.sync(from_path=cluster_id)
265
266 command = "env KUBECONFIG={} {} repo remove {}".format(
267 paths["kube_config"], self._helm_command, name
268 )
269 await self._local_async_exec(
270 command=command, raise_exception_on_error=True, env=env
271 )
272
273 # sync fs
274 self.fs.reverse_sync(from_path=cluster_id)
275
276 async def reset(
277 self,
278 cluster_uuid: str,
279 force: bool = False,
280 uninstall_sw: bool = False,
281 **kwargs,
282 ) -> bool:
283 """Reset a cluster
284
285 Resets the Kubernetes cluster by removing the helm deployment that represents it.
286
287 :param cluster_uuid: The UUID of the cluster to reset
288 :param force: Boolean to force the reset
289 :param uninstall_sw: Boolean to force the reset
290 :param kwargs: Additional parameters (None yet)
291 :return: Returns True if successful or raises an exception.
292 """
293 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
294 self.log.debug(
295 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
296 cluster_id, uninstall_sw
297 )
298 )
299
300 # sync local dir
301 self.fs.sync(from_path=cluster_id)
302
303 # uninstall releases if needed.
304 if uninstall_sw:
305 releases = await self.instances_list(cluster_uuid=cluster_uuid)
306 if len(releases) > 0:
307 if force:
308 for r in releases:
309 try:
310 kdu_instance = r.get("name")
311 chart = r.get("chart")
312 self.log.debug(
313 "Uninstalling {} -> {}".format(chart, kdu_instance)
314 )
315 await self.uninstall(
316 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
317 )
318 except Exception as e:
319 # will not raise exception as it was found
320 # that in some cases of previously installed helm releases it
321 # raised an error
322 self.log.warn(
323 "Error uninstalling release {}: {}".format(
324 kdu_instance, e
325 )
326 )
327 else:
328 msg = (
329 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
330 ).format(cluster_id)
331 self.log.warn(msg)
332 uninstall_sw = (
333 False # Allow to remove k8s cluster without removing Tiller
334 )
335
336 if uninstall_sw:
337 await self._uninstall_sw(cluster_id, namespace)
338
339 # delete cluster directory
340 self.log.debug("Removing directory {}".format(cluster_id))
341 self.fs.file_delete(cluster_id, ignore_non_exist=True)
342 # Remove also local directorio if still exist
343 direct = self.fs.path + "/" + cluster_id
344 shutil.rmtree(direct, ignore_errors=True)
345
346 return True
347
348 async def _install_impl(
349 self,
350 cluster_id: str,
351 kdu_model: str,
352 paths: dict,
353 env: dict,
354 kdu_instance: str,
355 atomic: bool = True,
356 timeout: float = 300,
357 params: dict = None,
358 db_dict: dict = None,
359 kdu_name: str = None,
360 namespace: str = None,
361 ):
362 # init env, paths
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
367 # params to str
368 params_str, file_to_delete = self._params_to_file_option(
369 cluster_id=cluster_id, params=params
370 )
371
372 # version
373 version = None
374 if ":" in kdu_model:
375 parts = kdu_model.split(sep=":")
376 if len(parts) == 2:
377 version = str(parts[1])
378 kdu_model = parts[0]
379
380 repo = self._split_repo(kdu_model)
381 if repo:
382 await self.repo_update(cluster_id, repo)
383
384 command = self._get_install_command(
385 kdu_model,
386 kdu_instance,
387 namespace,
388 params_str,
389 version,
390 atomic,
391 timeout,
392 paths["kube_config"],
393 )
394
395 self.log.debug("installing: {}".format(command))
396
397 if atomic:
398 # exec helm in a task
399 exec_task = asyncio.ensure_future(
400 coro_or_future=self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403 )
404
405 # write status in another task
406 status_task = asyncio.ensure_future(
407 coro_or_future=self._store_status(
408 cluster_id=cluster_id,
409 kdu_instance=kdu_instance,
410 namespace=namespace,
411 db_dict=db_dict,
412 operation="install",
413 )
414 )
415
416 # wait for execution task
417 await asyncio.wait([exec_task])
418
419 # cancel status task
420 status_task.cancel()
421
422 output, rc = exec_task.result()
423
424 else:
425 output, rc = await self._local_async_exec(
426 command=command, raise_exception_on_error=False, env=env
427 )
428
429 # remove temporal values yaml file
430 if file_to_delete:
431 os.remove(file_to_delete)
432
433 # write final status
434 await self._store_status(
435 cluster_id=cluster_id,
436 kdu_instance=kdu_instance,
437 namespace=namespace,
438 db_dict=db_dict,
439 operation="install",
440 )
441
442 if rc != 0:
443 msg = "Error executing command: {}\nOutput: {}".format(command, output)
444 self.log.error(msg)
445 raise K8sException(msg)
446
447 async def upgrade(
448 self,
449 cluster_uuid: str,
450 kdu_instance: str,
451 kdu_model: str = None,
452 atomic: bool = True,
453 timeout: float = 300,
454 params: dict = None,
455 db_dict: dict = None,
456 ):
457 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
458 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
459
460 # sync local dir
461 self.fs.sync(from_path=cluster_id)
462
463 # look for instance to obtain namespace
464 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
465 if not instance_info:
466 raise K8sException("kdu_instance {} not found".format(kdu_instance))
467
468 # init env, paths
469 paths, env = self._init_paths_env(
470 cluster_name=cluster_id, create_if_not_exist=True
471 )
472
473 # sync local dir
474 self.fs.sync(from_path=cluster_id)
475
476 # params to str
477 params_str, file_to_delete = self._params_to_file_option(
478 cluster_id=cluster_id, params=params
479 )
480
481 # version
482 version = None
483 if ":" in kdu_model:
484 parts = kdu_model.split(sep=":")
485 if len(parts) == 2:
486 version = str(parts[1])
487 kdu_model = parts[0]
488
489 repo = self._split_repo(kdu_model)
490 if repo:
491 await self.repo_update(cluster_id, repo)
492
493 command = self._get_upgrade_command(
494 kdu_model,
495 kdu_instance,
496 instance_info["namespace"],
497 params_str,
498 version,
499 atomic,
500 timeout,
501 paths["kube_config"],
502 )
503
504 self.log.debug("upgrading: {}".format(command))
505
506 if atomic:
507 # exec helm in a task
508 exec_task = asyncio.ensure_future(
509 coro_or_future=self._local_async_exec(
510 command=command, raise_exception_on_error=False, env=env
511 )
512 )
513 # write status in another task
514 status_task = asyncio.ensure_future(
515 coro_or_future=self._store_status(
516 cluster_id=cluster_id,
517 kdu_instance=kdu_instance,
518 namespace=instance_info["namespace"],
519 db_dict=db_dict,
520 operation="upgrade",
521 )
522 )
523
524 # wait for execution task
525 await asyncio.wait([exec_task])
526
527 # cancel status task
528 status_task.cancel()
529 output, rc = exec_task.result()
530
531 else:
532 output, rc = await self._local_async_exec(
533 command=command, raise_exception_on_error=False, env=env
534 )
535
536 # remove temporal values yaml file
537 if file_to_delete:
538 os.remove(file_to_delete)
539
540 # write final status
541 await self._store_status(
542 cluster_id=cluster_id,
543 kdu_instance=kdu_instance,
544 namespace=instance_info["namespace"],
545 db_dict=db_dict,
546 operation="upgrade",
547 )
548
549 if rc != 0:
550 msg = "Error executing command: {}\nOutput: {}".format(command, output)
551 self.log.error(msg)
552 raise K8sException(msg)
553
554 # sync fs
555 self.fs.reverse_sync(from_path=cluster_id)
556
557 # return new revision number
558 instance = await self.get_instance_info(
559 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
560 )
561 if instance:
562 revision = int(instance.get("revision"))
563 self.log.debug("New revision: {}".format(revision))
564 return revision
565 else:
566 return 0
567
568 async def scale(
569 self,
570 kdu_instance: str,
571 scale: int,
572 resource_name: str,
573 total_timeout: float = 1800,
574 **kwargs,
575 ):
576 raise NotImplementedError("Method not implemented")
577
578 async def get_scale_count(
579 self,
580 resource_name: str,
581 kdu_instance: str,
582 **kwargs,
583 ):
584 raise NotImplementedError("Method not implemented")
585
586 async def rollback(
587 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
588 ):
589 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
590 self.log.debug(
591 "rollback kdu_instance {} to revision {} from cluster {}".format(
592 kdu_instance, revision, cluster_id
593 )
594 )
595
596 # sync local dir
597 self.fs.sync(from_path=cluster_id)
598
599 # look for instance to obtain namespace
600 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
601 if not instance_info:
602 raise K8sException("kdu_instance {} not found".format(kdu_instance))
603
604 # init env, paths
605 paths, env = self._init_paths_env(
606 cluster_name=cluster_id, create_if_not_exist=True
607 )
608
609 # sync local dir
610 self.fs.sync(from_path=cluster_id)
611
612 command = self._get_rollback_command(
613 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
614 )
615
616 self.log.debug("rolling_back: {}".format(command))
617
618 # exec helm in a task
619 exec_task = asyncio.ensure_future(
620 coro_or_future=self._local_async_exec(
621 command=command, raise_exception_on_error=False, env=env
622 )
623 )
624 # write status in another task
625 status_task = asyncio.ensure_future(
626 coro_or_future=self._store_status(
627 cluster_id=cluster_id,
628 kdu_instance=kdu_instance,
629 namespace=instance_info["namespace"],
630 db_dict=db_dict,
631 operation="rollback",
632 )
633 )
634
635 # wait for execution task
636 await asyncio.wait([exec_task])
637
638 # cancel status task
639 status_task.cancel()
640
641 output, rc = exec_task.result()
642
643 # write final status
644 await self._store_status(
645 cluster_id=cluster_id,
646 kdu_instance=kdu_instance,
647 namespace=instance_info["namespace"],
648 db_dict=db_dict,
649 operation="rollback",
650 )
651
652 if rc != 0:
653 msg = "Error executing command: {}\nOutput: {}".format(command, output)
654 self.log.error(msg)
655 raise K8sException(msg)
656
657 # sync fs
658 self.fs.reverse_sync(from_path=cluster_id)
659
660 # return new revision number
661 instance = await self.get_instance_info(
662 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
663 )
664 if instance:
665 revision = int(instance.get("revision"))
666 self.log.debug("New revision: {}".format(revision))
667 return revision
668 else:
669 return 0
670
671 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
672 """
673 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
674 (this call should happen after all _terminate-config-primitive_ of the VNF
675 are invoked).
676
677 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
678 :param kdu_instance: unique name for the KDU instance to be deleted
679 :param kwargs: Additional parameters (None yet)
680 :return: True if successful
681 """
682
683 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
684 self.log.debug(
685 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
686 )
687
688 # sync local dir
689 self.fs.sync(from_path=cluster_id)
690
691 # look for instance to obtain namespace
692 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
693 if not instance_info:
694 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
695 return True
696 # init env, paths
697 paths, env = self._init_paths_env(
698 cluster_name=cluster_id, create_if_not_exist=True
699 )
700
701 # sync local dir
702 self.fs.sync(from_path=cluster_id)
703
704 command = self._get_uninstall_command(
705 kdu_instance, instance_info["namespace"], paths["kube_config"]
706 )
707 output, _rc = await self._local_async_exec(
708 command=command, raise_exception_on_error=True, env=env
709 )
710
711 # sync fs
712 self.fs.reverse_sync(from_path=cluster_id)
713
714 return self._output_to_table(output)
715
716 async def instances_list(self, cluster_uuid: str) -> list:
717 """
718 returns a list of deployed releases in a cluster
719
720 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
721 :return:
722 """
723
724 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
725 self.log.debug("list releases for cluster {}".format(cluster_id))
726
727 # sync local dir
728 self.fs.sync(from_path=cluster_id)
729
730 # execute internal command
731 result = await self._instances_list(cluster_id)
732
733 # sync fs
734 self.fs.reverse_sync(from_path=cluster_id)
735
736 return result
737
738 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
739 instances = await self.instances_list(cluster_uuid=cluster_uuid)
740 for instance in instances:
741 if instance.get("name") == kdu_instance:
742 return instance
743 self.log.debug("Instance {} not found".format(kdu_instance))
744 return None
745
746 async def exec_primitive(
747 self,
748 cluster_uuid: str = None,
749 kdu_instance: str = None,
750 primitive_name: str = None,
751 timeout: float = 300,
752 params: dict = None,
753 db_dict: dict = None,
754 **kwargs,
755 ) -> str:
756 """Exec primitive (Juju action)
757
758 :param cluster_uuid: The UUID of the cluster or namespace:cluster
759 :param kdu_instance: The unique name of the KDU instance
760 :param primitive_name: Name of action that will be executed
761 :param timeout: Timeout for action execution
762 :param params: Dictionary of all the parameters needed for the action
763 :db_dict: Dictionary for any additional data
764 :param kwargs: Additional parameters (None yet)
765
766 :return: Returns the output of the action
767 """
768 raise K8sException(
769 "KDUs deployed with Helm don't support actions "
770 "different from rollback, upgrade and status"
771 )
772
773 async def get_services(
774 self, cluster_uuid: str, kdu_instance: str, namespace: str
775 ) -> list:
776 """
777 Returns a list of services defined for the specified kdu instance.
778
779 :param cluster_uuid: UUID of a K8s cluster known by OSM
780 :param kdu_instance: unique name for the KDU instance
781 :param namespace: K8s namespace used by the KDU instance
782 :return: If successful, it will return a list of services, Each service
783 can have the following data:
784 - `name` of the service
785 - `type` type of service in the k8 cluster
786 - `ports` List of ports offered by the service, for each port includes at least
787 name, port, protocol
788 - `cluster_ip` Internal ip to be used inside k8s cluster
789 - `external_ip` List of external ips (in case they are available)
790 """
791
792 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
793 self.log.debug(
794 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
795 cluster_uuid, kdu_instance
796 )
797 )
798
799 # init env, paths
800 paths, env = self._init_paths_env(
801 cluster_name=cluster_id, create_if_not_exist=True
802 )
803
804 # sync local dir
805 self.fs.sync(from_path=cluster_id)
806
807 # get list of services names for kdu
808 service_names = await self._get_services(
809 cluster_id, kdu_instance, namespace, paths["kube_config"]
810 )
811
812 service_list = []
813 for service in service_names:
814 service = await self._get_service(cluster_id, service, namespace)
815 service_list.append(service)
816
817 # sync fs
818 self.fs.reverse_sync(from_path=cluster_id)
819
820 return service_list
821
822 async def get_service(
823 self, cluster_uuid: str, service_name: str, namespace: str
824 ) -> object:
825 self.log.debug(
826 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
827 service_name, namespace, cluster_uuid
828 )
829 )
830
831 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
832
833 # sync local dir
834 self.fs.sync(from_path=cluster_id)
835
836 service = await self._get_service(cluster_id, service_name, namespace)
837
838 # sync fs
839 self.fs.reverse_sync(from_path=cluster_id)
840
841 return service
842
843 async def status_kdu(
844 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
845 ) -> Union[str, dict]:
846 """
847 This call would retrieve tha current state of a given KDU instance. It would be
848 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
849 values_ of the configuration parameters applied to a given instance. This call
850 would be based on the `status` call.
851
852 :param cluster_uuid: UUID of a K8s cluster known by OSM
853 :param kdu_instance: unique name for the KDU instance
854 :param kwargs: Additional parameters (None yet)
855 :param yaml_format: if the return shall be returned as an YAML string or as a
856 dictionary
857 :return: If successful, it will return the following vector of arguments:
858 - K8s `namespace` in the cluster where the KDU lives
859 - `state` of the KDU instance. It can be:
860 - UNKNOWN
861 - DEPLOYED
862 - DELETED
863 - SUPERSEDED
864 - FAILED or
865 - DELETING
866 - List of `resources` (objects) that this release consists of, sorted by kind,
867 and the status of those resources
868 - Last `deployment_time`.
869
870 """
871 self.log.debug(
872 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
873 cluster_uuid, kdu_instance
874 )
875 )
876
877 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
878
879 # sync local dir
880 self.fs.sync(from_path=cluster_id)
881
882 # get instance: needed to obtain namespace
883 instances = await self._instances_list(cluster_id=cluster_id)
884 for instance in instances:
885 if instance.get("name") == kdu_instance:
886 break
887 else:
888 # instance does not exist
889 raise K8sException(
890 "Instance name: {} not found in cluster: {}".format(
891 kdu_instance, cluster_id
892 )
893 )
894
895 status = await self._status_kdu(
896 cluster_id=cluster_id,
897 kdu_instance=kdu_instance,
898 namespace=instance["namespace"],
899 yaml_format=yaml_format,
900 show_error_log=True,
901 )
902
903 # sync fs
904 self.fs.reverse_sync(from_path=cluster_id)
905
906 return status
907
908 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
909 self.log.debug(
910 "inspect kdu_model values {} from (optional) repo: {}".format(
911 kdu_model, repo_url
912 )
913 )
914
915 return await self._exec_inspect_comand(
916 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
917 )
918
919 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
920 self.log.debug(
921 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
922 )
923
924 return await self._exec_inspect_comand(
925 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
926 )
927
928 async def synchronize_repos(self, cluster_uuid: str):
929 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
930 try:
931 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
932 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
933
934 local_repo_list = await self.repo_list(cluster_uuid)
935 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
936
937 deleted_repo_list = []
938 added_repo_dict = {}
939
940 # iterate over the list of repos in the database that should be
941 # added if not present
942 for repo_name, db_repo in db_repo_dict.items():
943 try:
944 # check if it is already present
945 curr_repo_url = local_repo_dict.get(db_repo["name"])
946 repo_id = db_repo.get("_id")
947 if curr_repo_url != db_repo["url"]:
948 if curr_repo_url:
949 self.log.debug(
950 "repo {} url changed, delete and and again".format(
951 db_repo["url"]
952 )
953 )
954 await self.repo_remove(cluster_uuid, db_repo["name"])
955 deleted_repo_list.append(repo_id)
956
957 # add repo
958 self.log.debug("add repo {}".format(db_repo["name"]))
959 await self.repo_add(
960 cluster_uuid, db_repo["name"], db_repo["url"]
961 )
962 added_repo_dict[repo_id] = db_repo["name"]
963 except Exception as e:
964 raise K8sException(
965 "Error adding repo id: {}, err_msg: {} ".format(
966 repo_id, repr(e)
967 )
968 )
969
970 # Delete repos that are present but not in nbi_list
971 for repo_name in local_repo_dict:
972 if not db_repo_dict.get(repo_name) and repo_name != "stable":
973 self.log.debug("delete repo {}".format(repo_name))
974 try:
975 await self.repo_remove(cluster_uuid, repo_name)
976 deleted_repo_list.append(repo_name)
977 except Exception as e:
978 self.warning(
979 "Error deleting repo, name: {}, err_msg: {}".format(
980 repo_name, str(e)
981 )
982 )
983
984 return deleted_repo_list, added_repo_dict
985
986 except K8sException:
987 raise
988 except Exception as e:
989 # Do not raise errors synchronizing repos
990 self.log.error("Error synchronizing repos: {}".format(e))
991 raise Exception("Error synchronizing repos: {}".format(e))
992
993 def _get_db_repos_dict(self, repo_ids: list):
994 db_repos_dict = {}
995 for repo_id in repo_ids:
996 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
997 db_repos_dict[db_repo["name"]] = db_repo
998 return db_repos_dict
999
1000 """
1001 ####################################################################################
1002 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1003 ####################################################################################
1004 """
1005
1006 @abc.abstractmethod
1007 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1008 """
1009 Creates and returns base cluster and kube dirs and returns them.
1010 Also created helm3 dirs according to new directory specification, paths are
1011 not returned but assigned to helm environment variables
1012
1013 :param cluster_name: cluster_name
1014 :return: Dictionary with config_paths and dictionary with helm environment variables
1015 """
1016
1017 @abc.abstractmethod
1018 async def _cluster_init(self, cluster_id, namespace, paths, env):
1019 """
1020 Implements the helm version dependent cluster initialization
1021 """
1022
1023 @abc.abstractmethod
1024 async def _instances_list(self, cluster_id):
1025 """
1026 Implements the helm version dependent helm instances list
1027 """
1028
1029 @abc.abstractmethod
1030 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1031 """
1032 Implements the helm version dependent method to obtain services from a helm instance
1033 """
1034
1035 @abc.abstractmethod
1036 async def _status_kdu(
1037 self,
1038 cluster_id: str,
1039 kdu_instance: str,
1040 namespace: str = None,
1041 yaml_format: bool = False,
1042 show_error_log: bool = False,
1043 ) -> Union[str, dict]:
1044 """
1045 Implements the helm version dependent method to obtain status of a helm instance
1046 """
1047
1048 @abc.abstractmethod
1049 def _get_install_command(
1050 self,
1051 kdu_model,
1052 kdu_instance,
1053 namespace,
1054 params_str,
1055 version,
1056 atomic,
1057 timeout,
1058 kubeconfig,
1059 ) -> str:
1060 """
1061 Obtain command to be executed to delete the indicated instance
1062 """
1063
1064 @abc.abstractmethod
1065 def _get_upgrade_command(
1066 self,
1067 kdu_model,
1068 kdu_instance,
1069 namespace,
1070 params_str,
1071 version,
1072 atomic,
1073 timeout,
1074 kubeconfig,
1075 ) -> str:
1076 """
1077 Obtain command to be executed to upgrade the indicated instance
1078 """
1079
1080 @abc.abstractmethod
1081 def _get_rollback_command(
1082 self, kdu_instance, namespace, revision, kubeconfig
1083 ) -> str:
1084 """
1085 Obtain command to be executed to rollback the indicated instance
1086 """
1087
1088 @abc.abstractmethod
1089 def _get_uninstall_command(
1090 self, kdu_instance: str, namespace: str, kubeconfig: str
1091 ) -> str:
1092 """
1093 Obtain command to be executed to delete the indicated instance
1094 """
1095
1096 @abc.abstractmethod
1097 def _get_inspect_command(
1098 self, show_command: str, kdu_model: str, repo_str: str, version: str
1099 ):
1100 """
1101 Obtain command to be executed to obtain information about the kdu
1102 """
1103
1104 @abc.abstractmethod
1105 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1106 """
1107 Method call to uninstall cluster software for helm. This method is dependent
1108 of helm version
1109 For Helm v2 it will be called when Tiller must be uninstalled
1110 For Helm v3 it does nothing and does not need to be callled
1111 """
1112
1113 @abc.abstractmethod
1114 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1115 """
1116 Obtains the cluster repos identifiers
1117 """
1118
1119 """
1120 ####################################################################################
1121 ################################### P R I V A T E ##################################
1122 ####################################################################################
1123 """
1124
1125 @staticmethod
1126 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1127 if os.path.exists(filename):
1128 return True
1129 else:
1130 msg = "File {} does not exist".format(filename)
1131 if exception_if_not_exists:
1132 raise K8sException(msg)
1133
1134 @staticmethod
1135 def _remove_multiple_spaces(strobj):
1136 strobj = strobj.strip()
1137 while " " in strobj:
1138 strobj = strobj.replace(" ", " ")
1139 return strobj
1140
1141 @staticmethod
1142 def _output_to_lines(output: str) -> list:
1143 output_lines = list()
1144 lines = output.splitlines(keepends=False)
1145 for line in lines:
1146 line = line.strip()
1147 if len(line) > 0:
1148 output_lines.append(line)
1149 return output_lines
1150
1151 @staticmethod
1152 def _output_to_table(output: str) -> list:
1153 output_table = list()
1154 lines = output.splitlines(keepends=False)
1155 for line in lines:
1156 line = line.replace("\t", " ")
1157 line_list = list()
1158 output_table.append(line_list)
1159 cells = line.split(sep=" ")
1160 for cell in cells:
1161 cell = cell.strip()
1162 if len(cell) > 0:
1163 line_list.append(cell)
1164 return output_table
1165
1166 @staticmethod
1167 def _parse_services(output: str) -> list:
1168 lines = output.splitlines(keepends=False)
1169 services = []
1170 for line in lines:
1171 line = line.replace("\t", " ")
1172 cells = line.split(sep=" ")
1173 if len(cells) > 0 and cells[0].startswith("service/"):
1174 elems = cells[0].split(sep="/")
1175 if len(elems) > 1:
1176 services.append(elems[1])
1177 return services
1178
1179 @staticmethod
1180 def _get_deep(dictionary: dict, members: tuple):
1181 target = dictionary
1182 value = None
1183 try:
1184 for m in members:
1185 value = target.get(m)
1186 if not value:
1187 return None
1188 else:
1189 target = value
1190 except Exception:
1191 pass
1192 return value
1193
1194 # find key:value in several lines
1195 @staticmethod
1196 def _find_in_lines(p_lines: list, p_key: str) -> str:
1197 for line in p_lines:
1198 try:
1199 if line.startswith(p_key + ":"):
1200 parts = line.split(":")
1201 the_value = parts[1].strip()
1202 return the_value
1203 except Exception:
1204 # ignore it
1205 pass
1206 return None
1207
1208 @staticmethod
1209 def _lower_keys_list(input_list: list):
1210 """
1211 Transform the keys in a list of dictionaries to lower case and returns a new list
1212 of dictionaries
1213 """
1214 new_list = []
1215 if input_list:
1216 for dictionary in input_list:
1217 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1218 new_list.append(new_dict)
1219 return new_list
1220
1221 async def _local_async_exec(
1222 self,
1223 command: str,
1224 raise_exception_on_error: bool = False,
1225 show_error_log: bool = True,
1226 encode_utf8: bool = False,
1227 env: dict = None,
1228 ) -> (str, int):
1229 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1230 self.log.debug(
1231 "Executing async local command: {}, env: {}".format(command, env)
1232 )
1233
1234 # split command
1235 command = shlex.split(command)
1236
1237 environ = os.environ.copy()
1238 if env:
1239 environ.update(env)
1240
1241 try:
1242 process = await asyncio.create_subprocess_exec(
1243 *command,
1244 stdout=asyncio.subprocess.PIPE,
1245 stderr=asyncio.subprocess.PIPE,
1246 env=environ,
1247 )
1248
1249 # wait for command terminate
1250 stdout, stderr = await process.communicate()
1251
1252 return_code = process.returncode
1253
1254 output = ""
1255 if stdout:
1256 output = stdout.decode("utf-8").strip()
1257 # output = stdout.decode()
1258 if stderr:
1259 output = stderr.decode("utf-8").strip()
1260 # output = stderr.decode()
1261
1262 if return_code != 0 and show_error_log:
1263 self.log.debug(
1264 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1265 )
1266 else:
1267 self.log.debug("Return code: {}".format(return_code))
1268
1269 if raise_exception_on_error and return_code != 0:
1270 raise K8sException(output)
1271
1272 if encode_utf8:
1273 output = output.encode("utf-8").strip()
1274 output = str(output).replace("\\n", "\n")
1275
1276 return output, return_code
1277
1278 except asyncio.CancelledError:
1279 raise
1280 except K8sException:
1281 raise
1282 except Exception as e:
1283 msg = "Exception executing command: {} -> {}".format(command, e)
1284 self.log.error(msg)
1285 if raise_exception_on_error:
1286 raise K8sException(e) from e
1287 else:
1288 return "", -1
1289
1290 async def _local_async_exec_pipe(
1291 self,
1292 command1: str,
1293 command2: str,
1294 raise_exception_on_error: bool = True,
1295 show_error_log: bool = True,
1296 encode_utf8: bool = False,
1297 env: dict = None,
1298 ):
1299 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1300 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1301 command = "{} | {}".format(command1, command2)
1302 self.log.debug(
1303 "Executing async local command: {}, env: {}".format(command, env)
1304 )
1305
1306 # split command
1307 command1 = shlex.split(command1)
1308 command2 = shlex.split(command2)
1309
1310 environ = os.environ.copy()
1311 if env:
1312 environ.update(env)
1313
1314 try:
1315 read, write = os.pipe()
1316 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1317 os.close(write)
1318 process_2 = await asyncio.create_subprocess_exec(
1319 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1320 )
1321 os.close(read)
1322 stdout, stderr = await process_2.communicate()
1323
1324 return_code = process_2.returncode
1325
1326 output = ""
1327 if stdout:
1328 output = stdout.decode("utf-8").strip()
1329 # output = stdout.decode()
1330 if stderr:
1331 output = stderr.decode("utf-8").strip()
1332 # output = stderr.decode()
1333
1334 if return_code != 0 and show_error_log:
1335 self.log.debug(
1336 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1337 )
1338 else:
1339 self.log.debug("Return code: {}".format(return_code))
1340
1341 if raise_exception_on_error and return_code != 0:
1342 raise K8sException(output)
1343
1344 if encode_utf8:
1345 output = output.encode("utf-8").strip()
1346 output = str(output).replace("\\n", "\n")
1347
1348 return output, return_code
1349 except asyncio.CancelledError:
1350 raise
1351 except K8sException:
1352 raise
1353 except Exception as e:
1354 msg = "Exception executing command: {} -> {}".format(command, e)
1355 self.log.error(msg)
1356 if raise_exception_on_error:
1357 raise K8sException(e) from e
1358 else:
1359 return "", -1
1360
1361 async def _get_service(self, cluster_id, service_name, namespace):
1362 """
1363 Obtains the data of the specified service in the k8cluster.
1364
1365 :param cluster_id: id of a K8s cluster known by OSM
1366 :param service_name: name of the K8s service in the specified namespace
1367 :param namespace: K8s namespace used by the KDU instance
1368 :return: If successful, it will return a service with the following data:
1369 - `name` of the service
1370 - `type` type of service in the k8 cluster
1371 - `ports` List of ports offered by the service, for each port includes at least
1372 name, port, protocol
1373 - `cluster_ip` Internal ip to be used inside k8s cluster
1374 - `external_ip` List of external ips (in case they are available)
1375 """
1376
1377 # init config, env
1378 paths, env = self._init_paths_env(
1379 cluster_name=cluster_id, create_if_not_exist=True
1380 )
1381
1382 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1383 self.kubectl_command, paths["kube_config"], namespace, service_name
1384 )
1385
1386 output, _rc = await self._local_async_exec(
1387 command=command, raise_exception_on_error=True, env=env
1388 )
1389
1390 data = yaml.load(output, Loader=yaml.SafeLoader)
1391
1392 service = {
1393 "name": service_name,
1394 "type": self._get_deep(data, ("spec", "type")),
1395 "ports": self._get_deep(data, ("spec", "ports")),
1396 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1397 }
1398 if service["type"] == "LoadBalancer":
1399 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1400 ip_list = [elem["ip"] for elem in ip_map_list]
1401 service["external_ip"] = ip_list
1402
1403 return service
1404
1405 async def _exec_inspect_comand(
1406 self, inspect_command: str, kdu_model: str, repo_url: str = None
1407 ):
1408 """
1409 Obtains information about a kdu, no cluster (no env)
1410 """
1411
1412 repo_str = ""
1413 if repo_url:
1414 repo_str = " --repo {}".format(repo_url)
1415
1416 idx = kdu_model.find("/")
1417 if idx >= 0:
1418 idx += 1
1419 kdu_model = kdu_model[idx:]
1420
1421 version = ""
1422 if ":" in kdu_model:
1423 parts = kdu_model.split(sep=":")
1424 if len(parts) == 2:
1425 version = "--version {}".format(str(parts[1]))
1426 kdu_model = parts[0]
1427
1428 full_command = self._get_inspect_command(
1429 inspect_command, kdu_model, repo_str, version
1430 )
1431 output, _rc = await self._local_async_exec(
1432 command=full_command, encode_utf8=True
1433 )
1434
1435 return output
1436
1437 async def _store_status(
1438 self,
1439 cluster_id: str,
1440 operation: str,
1441 kdu_instance: str,
1442 namespace: str = None,
1443 db_dict: dict = None,
1444 ) -> None:
1445 """
1446 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1447
1448 :param cluster_id (str): the cluster where the KDU instance is deployed
1449 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1450 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1451 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1452 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1453 values for the keys:
1454 - "collection": The Mongo DB collection to write to
1455 - "filter": The query filter to use in the update process
1456 - "path": The dot separated keys which targets the object to be updated
1457 Defaults to None.
1458 """
1459
1460 try:
1461 detailed_status = await self._status_kdu(
1462 cluster_id=cluster_id,
1463 kdu_instance=kdu_instance,
1464 yaml_format=False,
1465 namespace=namespace,
1466 )
1467
1468 status = detailed_status.get("info").get("description")
1469 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1470
1471 # write status to db
1472 result = await self.write_app_status_to_db(
1473 db_dict=db_dict,
1474 status=str(status),
1475 detailed_status=str(detailed_status),
1476 operation=operation,
1477 )
1478
1479 if not result:
1480 self.log.info("Error writing in database. Task exiting...")
1481
1482 except asyncio.CancelledError as e:
1483 self.log.warning(
1484 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1485 )
1486 except Exception as e:
1487 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1488
1489 # params for use in -f file
1490 # returns values file option and filename (in order to delete it at the end)
1491 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1492 if params and len(params) > 0:
1493 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1494
1495 def get_random_number():
1496 r = random.randrange(start=1, stop=99999999)
1497 s = str(r)
1498 while len(s) < 10:
1499 s = "0" + s
1500 return s
1501
1502 params2 = dict()
1503 for key in params:
1504 value = params.get(key)
1505 if "!!yaml" in str(value):
1506 value = yaml.load(value[7:])
1507 params2[key] = value
1508
1509 values_file = get_random_number() + ".yaml"
1510 with open(values_file, "w") as stream:
1511 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1512
1513 return "-f {}".format(values_file), values_file
1514
1515 return "", None
1516
1517 # params for use in --set option
1518 @staticmethod
1519 def _params_to_set_option(params: dict) -> str:
1520 params_str = ""
1521 if params and len(params) > 0:
1522 start = True
1523 for key in params:
1524 value = params.get(key, None)
1525 if value is not None:
1526 if start:
1527 params_str += "--set "
1528 start = False
1529 else:
1530 params_str += ","
1531 params_str += "{}={}".format(key, value)
1532 return params_str
1533
1534 @staticmethod
1535 def generate_kdu_instance_name(**kwargs):
1536 chart_name = kwargs["kdu_model"]
1537 # check embeded chart (file or dir)
1538 if chart_name.startswith("/"):
1539 # extract file or directory name
1540 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1541 # check URL
1542 elif "://" in chart_name:
1543 # extract last portion of URL
1544 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1545
1546 name = ""
1547 for c in chart_name:
1548 if c.isalpha() or c.isnumeric():
1549 name += c
1550 else:
1551 name += "-"
1552 if len(name) > 35:
1553 name = name[0:35]
1554
1555 # if does not start with alpha character, prefix 'a'
1556 if not name[0].isalpha():
1557 name = "a" + name
1558
1559 name += "-"
1560
1561 def get_random_number():
1562 r = random.randrange(start=1, stop=99999999)
1563 s = str(r)
1564 s = s.rjust(10, "0")
1565 return s
1566
1567 name = name + get_random_number()
1568 return name.lower()
1569
1570 def _split_repo(self, kdu_model: str) -> str:
1571 repo_name = None
1572 idx = kdu_model.find("/")
1573 if idx >= 0:
1574 repo_name = kdu_model[:idx]
1575 return repo_name