Bug 2007 fixed: removed the while true from K8sHelmBaseConnector._store_status
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
101
102 async def init_env(
103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
126 else:
127 cluster_id = str(uuid4())
128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
129
130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
150 return cluster_uuid, n2vc_installed_sw
151
152 async def repo_add(
153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
154 ):
155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_id, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_id)
169
170 # helm repo add name url
171 command = "env KUBECONFIG={} {} repo add {} {}".format(
172 paths["kube_config"], self._helm_command, name, url
173 )
174 self.log.debug("adding repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=True, env=env
177 )
178
179 # helm repo update
180 command = "env KUBECONFIG={} {} repo update {}".format(
181 paths["kube_config"], self._helm_command, name
182 )
183 self.log.debug("updating repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=False, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_uuid)
190
191 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
192 self.log.debug(
193 "Cluster {}, updating {} repository {}".format(
194 cluster_uuid, repo_type, name
195 )
196 )
197
198 # init_env
199 paths, env = self._init_paths_env(
200 cluster_name=cluster_uuid, create_if_not_exist=True
201 )
202
203 # sync local dir
204 self.fs.sync(from_path=cluster_uuid)
205
206 # helm repo update
207 command = "{} repo update {}".format(self._helm_command, name)
208 self.log.debug("updating repo: {}".format(command))
209 await self._local_async_exec(
210 command=command, raise_exception_on_error=False, env=env
211 )
212
213 # sync fs
214 self.fs.reverse_sync(from_path=cluster_uuid)
215
216 async def repo_list(self, cluster_uuid: str) -> list:
217 """
218 Get the list of registered repositories
219
220 :return: list of registered repositories: [ (name, url) .... ]
221 """
222
223 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
224 self.log.debug("list repositories for cluster {}".format(cluster_id))
225
226 # config filename
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_id, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_id)
233
234 command = "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths["kube_config"], self._helm_command
236 )
237
238 # Set exception to false because if there are no repos just want an empty list
239 output, _rc = await self._local_async_exec(
240 command=command, raise_exception_on_error=False, env=env
241 )
242
243 # sync fs
244 self.fs.reverse_sync(from_path=cluster_id)
245
246 if _rc == 0:
247 if output and len(output) > 0:
248 repos = yaml.load(output, Loader=yaml.SafeLoader)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self._lower_keys_list(repos)
251 else:
252 return []
253 else:
254 return []
255
256 async def repo_remove(self, cluster_uuid: str, name: str):
257
258 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
259 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
260
261 # init env, paths
262 paths, env = self._init_paths_env(
263 cluster_name=cluster_id, create_if_not_exist=True
264 )
265
266 # sync local dir
267 self.fs.sync(from_path=cluster_id)
268
269 command = "env KUBECONFIG={} {} repo remove {}".format(
270 paths["kube_config"], self._helm_command, name
271 )
272 await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
274 )
275
276 # sync fs
277 self.fs.reverse_sync(from_path=cluster_id)
278
279 async def reset(
280 self,
281 cluster_uuid: str,
282 force: bool = False,
283 uninstall_sw: bool = False,
284 **kwargs,
285 ) -> bool:
286 """Reset a cluster
287
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
289
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
295 """
296 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
297 self.log.debug(
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_id, uninstall_sw
300 )
301 )
302
303 # sync local dir
304 self.fs.sync(from_path=cluster_id)
305
306 # uninstall releases if needed.
307 if uninstall_sw:
308 releases = await self.instances_list(cluster_uuid=cluster_uuid)
309 if len(releases) > 0:
310 if force:
311 for r in releases:
312 try:
313 kdu_instance = r.get("name")
314 chart = r.get("chart")
315 self.log.debug(
316 "Uninstalling {} -> {}".format(chart, kdu_instance)
317 )
318 await self.uninstall(
319 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
320 )
321 except Exception as e:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
324 # raised an error
325 self.log.warn(
326 "Error uninstalling release {}: {}".format(
327 kdu_instance, e
328 )
329 )
330 else:
331 msg = (
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 ).format(cluster_id)
334 self.log.warn(msg)
335 uninstall_sw = (
336 False # Allow to remove k8s cluster without removing Tiller
337 )
338
339 if uninstall_sw:
340 await self._uninstall_sw(cluster_id, namespace)
341
342 # delete cluster directory
343 self.log.debug("Removing directory {}".format(cluster_id))
344 self.fs.file_delete(cluster_id, ignore_non_exist=True)
345 # Remove also local directorio if still exist
346 direct = self.fs.path + "/" + cluster_id
347 shutil.rmtree(direct, ignore_errors=True)
348
349 return True
350
351 async def _install_impl(
352 self,
353 cluster_id: str,
354 kdu_model: str,
355 paths: dict,
356 env: dict,
357 kdu_instance: str,
358 atomic: bool = True,
359 timeout: float = 300,
360 params: dict = None,
361 db_dict: dict = None,
362 kdu_name: str = None,
363 namespace: str = None,
364 ):
365 # init env, paths
366 paths, env = self._init_paths_env(
367 cluster_name=cluster_id, create_if_not_exist=True
368 )
369
370 # params to str
371 params_str, file_to_delete = self._params_to_file_option(
372 cluster_id=cluster_id, params=params
373 )
374
375 # version
376 version = None
377 if ":" in kdu_model:
378 parts = kdu_model.split(sep=":")
379 if len(parts) == 2:
380 version = str(parts[1])
381 kdu_model = parts[0]
382
383 repo = self._split_repo(kdu_model)
384 if repo:
385 self.repo_update(cluster_id, repo)
386
387 command = self._get_install_command(
388 kdu_model,
389 kdu_instance,
390 namespace,
391 params_str,
392 version,
393 atomic,
394 timeout,
395 paths["kube_config"],
396 )
397
398 self.log.debug("installing: {}".format(command))
399
400 if atomic:
401 # exec helm in a task
402 exec_task = asyncio.ensure_future(
403 coro_or_future=self._local_async_exec(
404 command=command, raise_exception_on_error=False, env=env
405 )
406 )
407
408 # write status in another task
409 status_task = asyncio.ensure_future(
410 coro_or_future=self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
416 )
417 )
418
419 # wait for execution task
420 await asyncio.wait([exec_task])
421
422 # cancel status task
423 status_task.cancel()
424
425 output, rc = exec_task.result()
426
427 else:
428
429 output, rc = await self._local_async_exec(
430 command=command, raise_exception_on_error=False, env=env
431 )
432
433 # remove temporal values yaml file
434 if file_to_delete:
435 os.remove(file_to_delete)
436
437 # write final status
438 await self._store_status(
439 cluster_id=cluster_id,
440 kdu_instance=kdu_instance,
441 namespace=namespace,
442 db_dict=db_dict,
443 operation="install",
444 )
445
446 if rc != 0:
447 msg = "Error executing command: {}\nOutput: {}".format(command, output)
448 self.log.error(msg)
449 raise K8sException(msg)
450
451 async def upgrade(
452 self,
453 cluster_uuid: str,
454 kdu_instance: str,
455 kdu_model: str = None,
456 atomic: bool = True,
457 timeout: float = 300,
458 params: dict = None,
459 db_dict: dict = None,
460 ):
461 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
462 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
463
464 # sync local dir
465 self.fs.sync(from_path=cluster_id)
466
467 # look for instance to obtain namespace
468 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
469 if not instance_info:
470 raise K8sException("kdu_instance {} not found".format(kdu_instance))
471
472 # init env, paths
473 paths, env = self._init_paths_env(
474 cluster_name=cluster_id, create_if_not_exist=True
475 )
476
477 # sync local dir
478 self.fs.sync(from_path=cluster_id)
479
480 # params to str
481 params_str, file_to_delete = self._params_to_file_option(
482 cluster_id=cluster_id, params=params
483 )
484
485 # version
486 version = None
487 if ":" in kdu_model:
488 parts = kdu_model.split(sep=":")
489 if len(parts) == 2:
490 version = str(parts[1])
491 kdu_model = parts[0]
492
493 repo = self._split_repo(kdu_model)
494 if repo:
495 self.repo_update(cluster_uuid, repo)
496
497 command = self._get_upgrade_command(
498 kdu_model,
499 kdu_instance,
500 instance_info["namespace"],
501 params_str,
502 version,
503 atomic,
504 timeout,
505 paths["kube_config"],
506 )
507
508 self.log.debug("upgrading: {}".format(command))
509
510 if atomic:
511
512 # exec helm in a task
513 exec_task = asyncio.ensure_future(
514 coro_or_future=self._local_async_exec(
515 command=command, raise_exception_on_error=False, env=env
516 )
517 )
518 # write status in another task
519 status_task = asyncio.ensure_future(
520 coro_or_future=self._store_status(
521 cluster_id=cluster_id,
522 kdu_instance=kdu_instance,
523 namespace=instance_info["namespace"],
524 db_dict=db_dict,
525 operation="upgrade",
526 )
527 )
528
529 # wait for execution task
530 await asyncio.wait([exec_task])
531
532 # cancel status task
533 status_task.cancel()
534 output, rc = exec_task.result()
535
536 else:
537
538 output, rc = await self._local_async_exec(
539 command=command, raise_exception_on_error=False, env=env
540 )
541
542 # remove temporal values yaml file
543 if file_to_delete:
544 os.remove(file_to_delete)
545
546 # write final status
547 await self._store_status(
548 cluster_id=cluster_id,
549 kdu_instance=kdu_instance,
550 namespace=instance_info["namespace"],
551 db_dict=db_dict,
552 operation="upgrade",
553 )
554
555 if rc != 0:
556 msg = "Error executing command: {}\nOutput: {}".format(command, output)
557 self.log.error(msg)
558 raise K8sException(msg)
559
560 # sync fs
561 self.fs.reverse_sync(from_path=cluster_id)
562
563 # return new revision number
564 instance = await self.get_instance_info(
565 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
566 )
567 if instance:
568 revision = int(instance.get("revision"))
569 self.log.debug("New revision: {}".format(revision))
570 return revision
571 else:
572 return 0
573
574 async def scale(
575 self,
576 kdu_instance: str,
577 scale: int,
578 resource_name: str,
579 total_timeout: float = 1800,
580 **kwargs,
581 ):
582 raise NotImplementedError("Method not implemented")
583
584 async def get_scale_count(
585 self,
586 resource_name: str,
587 kdu_instance: str,
588 **kwargs,
589 ):
590 raise NotImplementedError("Method not implemented")
591
592 async def rollback(
593 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
594 ):
595
596 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
597 self.log.debug(
598 "rollback kdu_instance {} to revision {} from cluster {}".format(
599 kdu_instance, revision, cluster_id
600 )
601 )
602
603 # sync local dir
604 self.fs.sync(from_path=cluster_id)
605
606 # look for instance to obtain namespace
607 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
608 if not instance_info:
609 raise K8sException("kdu_instance {} not found".format(kdu_instance))
610
611 # init env, paths
612 paths, env = self._init_paths_env(
613 cluster_name=cluster_id, create_if_not_exist=True
614 )
615
616 # sync local dir
617 self.fs.sync(from_path=cluster_id)
618
619 command = self._get_rollback_command(
620 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
621 )
622
623 self.log.debug("rolling_back: {}".format(command))
624
625 # exec helm in a task
626 exec_task = asyncio.ensure_future(
627 coro_or_future=self._local_async_exec(
628 command=command, raise_exception_on_error=False, env=env
629 )
630 )
631 # write status in another task
632 status_task = asyncio.ensure_future(
633 coro_or_future=self._store_status(
634 cluster_id=cluster_id,
635 kdu_instance=kdu_instance,
636 namespace=instance_info["namespace"],
637 db_dict=db_dict,
638 operation="rollback",
639 )
640 )
641
642 # wait for execution task
643 await asyncio.wait([exec_task])
644
645 # cancel status task
646 status_task.cancel()
647
648 output, rc = exec_task.result()
649
650 # write final status
651 await self._store_status(
652 cluster_id=cluster_id,
653 kdu_instance=kdu_instance,
654 namespace=instance_info["namespace"],
655 db_dict=db_dict,
656 operation="rollback",
657 )
658
659 if rc != 0:
660 msg = "Error executing command: {}\nOutput: {}".format(command, output)
661 self.log.error(msg)
662 raise K8sException(msg)
663
664 # sync fs
665 self.fs.reverse_sync(from_path=cluster_id)
666
667 # return new revision number
668 instance = await self.get_instance_info(
669 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
670 )
671 if instance:
672 revision = int(instance.get("revision"))
673 self.log.debug("New revision: {}".format(revision))
674 return revision
675 else:
676 return 0
677
678 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
679 """
680 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
681 (this call should happen after all _terminate-config-primitive_ of the VNF
682 are invoked).
683
684 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
685 :param kdu_instance: unique name for the KDU instance to be deleted
686 :param kwargs: Additional parameters (None yet)
687 :return: True if successful
688 """
689
690 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
691 self.log.debug(
692 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
693 )
694
695 # sync local dir
696 self.fs.sync(from_path=cluster_id)
697
698 # look for instance to obtain namespace
699 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
700 if not instance_info:
701 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
702 return True
703 # init env, paths
704 paths, env = self._init_paths_env(
705 cluster_name=cluster_id, create_if_not_exist=True
706 )
707
708 # sync local dir
709 self.fs.sync(from_path=cluster_id)
710
711 command = self._get_uninstall_command(
712 kdu_instance, instance_info["namespace"], paths["kube_config"]
713 )
714 output, _rc = await self._local_async_exec(
715 command=command, raise_exception_on_error=True, env=env
716 )
717
718 # sync fs
719 self.fs.reverse_sync(from_path=cluster_id)
720
721 return self._output_to_table(output)
722
723 async def instances_list(self, cluster_uuid: str) -> list:
724 """
725 returns a list of deployed releases in a cluster
726
727 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
728 :return:
729 """
730
731 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
732 self.log.debug("list releases for cluster {}".format(cluster_id))
733
734 # sync local dir
735 self.fs.sync(from_path=cluster_id)
736
737 # execute internal command
738 result = await self._instances_list(cluster_id)
739
740 # sync fs
741 self.fs.reverse_sync(from_path=cluster_id)
742
743 return result
744
745 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
746 instances = await self.instances_list(cluster_uuid=cluster_uuid)
747 for instance in instances:
748 if instance.get("name") == kdu_instance:
749 return instance
750 self.log.debug("Instance {} not found".format(kdu_instance))
751 return None
752
753 async def exec_primitive(
754 self,
755 cluster_uuid: str = None,
756 kdu_instance: str = None,
757 primitive_name: str = None,
758 timeout: float = 300,
759 params: dict = None,
760 db_dict: dict = None,
761 **kwargs,
762 ) -> str:
763 """Exec primitive (Juju action)
764
765 :param cluster_uuid: The UUID of the cluster or namespace:cluster
766 :param kdu_instance: The unique name of the KDU instance
767 :param primitive_name: Name of action that will be executed
768 :param timeout: Timeout for action execution
769 :param params: Dictionary of all the parameters needed for the action
770 :db_dict: Dictionary for any additional data
771 :param kwargs: Additional parameters (None yet)
772
773 :return: Returns the output of the action
774 """
775 raise K8sException(
776 "KDUs deployed with Helm don't support actions "
777 "different from rollback, upgrade and status"
778 )
779
780 async def get_services(
781 self, cluster_uuid: str, kdu_instance: str, namespace: str
782 ) -> list:
783 """
784 Returns a list of services defined for the specified kdu instance.
785
786 :param cluster_uuid: UUID of a K8s cluster known by OSM
787 :param kdu_instance: unique name for the KDU instance
788 :param namespace: K8s namespace used by the KDU instance
789 :return: If successful, it will return a list of services, Each service
790 can have the following data:
791 - `name` of the service
792 - `type` type of service in the k8 cluster
793 - `ports` List of ports offered by the service, for each port includes at least
794 name, port, protocol
795 - `cluster_ip` Internal ip to be used inside k8s cluster
796 - `external_ip` List of external ips (in case they are available)
797 """
798
799 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
800 self.log.debug(
801 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
802 cluster_uuid, kdu_instance
803 )
804 )
805
806 # init env, paths
807 paths, env = self._init_paths_env(
808 cluster_name=cluster_id, create_if_not_exist=True
809 )
810
811 # sync local dir
812 self.fs.sync(from_path=cluster_id)
813
814 # get list of services names for kdu
815 service_names = await self._get_services(
816 cluster_id, kdu_instance, namespace, paths["kube_config"]
817 )
818
819 service_list = []
820 for service in service_names:
821 service = await self._get_service(cluster_id, service, namespace)
822 service_list.append(service)
823
824 # sync fs
825 self.fs.reverse_sync(from_path=cluster_id)
826
827 return service_list
828
829 async def get_service(
830 self, cluster_uuid: str, service_name: str, namespace: str
831 ) -> object:
832
833 self.log.debug(
834 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
835 service_name, namespace, cluster_uuid
836 )
837 )
838
839 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
840
841 # sync local dir
842 self.fs.sync(from_path=cluster_id)
843
844 service = await self._get_service(cluster_id, service_name, namespace)
845
846 # sync fs
847 self.fs.reverse_sync(from_path=cluster_id)
848
849 return service
850
851 async def status_kdu(
852 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
853 ) -> Union[str, dict]:
854 """
855 This call would retrieve tha current state of a given KDU instance. It would be
856 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
857 values_ of the configuration parameters applied to a given instance. This call
858 would be based on the `status` call.
859
860 :param cluster_uuid: UUID of a K8s cluster known by OSM
861 :param kdu_instance: unique name for the KDU instance
862 :param kwargs: Additional parameters (None yet)
863 :param yaml_format: if the return shall be returned as an YAML string or as a
864 dictionary
865 :return: If successful, it will return the following vector of arguments:
866 - K8s `namespace` in the cluster where the KDU lives
867 - `state` of the KDU instance. It can be:
868 - UNKNOWN
869 - DEPLOYED
870 - DELETED
871 - SUPERSEDED
872 - FAILED or
873 - DELETING
874 - List of `resources` (objects) that this release consists of, sorted by kind,
875 and the status of those resources
876 - Last `deployment_time`.
877
878 """
879 self.log.debug(
880 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
881 cluster_uuid, kdu_instance
882 )
883 )
884
885 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
886
887 # sync local dir
888 self.fs.sync(from_path=cluster_id)
889
890 # get instance: needed to obtain namespace
891 instances = await self._instances_list(cluster_id=cluster_id)
892 for instance in instances:
893 if instance.get("name") == kdu_instance:
894 break
895 else:
896 # instance does not exist
897 raise K8sException(
898 "Instance name: {} not found in cluster: {}".format(
899 kdu_instance, cluster_id
900 )
901 )
902
903 status = await self._status_kdu(
904 cluster_id=cluster_id,
905 kdu_instance=kdu_instance,
906 namespace=instance["namespace"],
907 yaml_format=yaml_format,
908 show_error_log=True,
909 )
910
911 # sync fs
912 self.fs.reverse_sync(from_path=cluster_id)
913
914 return status
915
916 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
917
918 self.log.debug(
919 "inspect kdu_model values {} from (optional) repo: {}".format(
920 kdu_model, repo_url
921 )
922 )
923
924 return await self._exec_inspect_comand(
925 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
926 )
927
928 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
929
930 self.log.debug(
931 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
932 )
933
934 return await self._exec_inspect_comand(
935 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
936 )
937
938 async def synchronize_repos(self, cluster_uuid: str):
939
940 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
941 try:
942 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
943 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
944
945 local_repo_list = await self.repo_list(cluster_uuid)
946 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
947
948 deleted_repo_list = []
949 added_repo_dict = {}
950
951 # iterate over the list of repos in the database that should be
952 # added if not present
953 for repo_name, db_repo in db_repo_dict.items():
954 try:
955 # check if it is already present
956 curr_repo_url = local_repo_dict.get(db_repo["name"])
957 repo_id = db_repo.get("_id")
958 if curr_repo_url != db_repo["url"]:
959 if curr_repo_url:
960 self.log.debug(
961 "repo {} url changed, delete and and again".format(
962 db_repo["url"]
963 )
964 )
965 await self.repo_remove(cluster_uuid, db_repo["name"])
966 deleted_repo_list.append(repo_id)
967
968 # add repo
969 self.log.debug("add repo {}".format(db_repo["name"]))
970 await self.repo_add(
971 cluster_uuid, db_repo["name"], db_repo["url"]
972 )
973 added_repo_dict[repo_id] = db_repo["name"]
974 except Exception as e:
975 raise K8sException(
976 "Error adding repo id: {}, err_msg: {} ".format(
977 repo_id, repr(e)
978 )
979 )
980
981 # Delete repos that are present but not in nbi_list
982 for repo_name in local_repo_dict:
983 if not db_repo_dict.get(repo_name) and repo_name != "stable":
984 self.log.debug("delete repo {}".format(repo_name))
985 try:
986 await self.repo_remove(cluster_uuid, repo_name)
987 deleted_repo_list.append(repo_name)
988 except Exception as e:
989 self.warning(
990 "Error deleting repo, name: {}, err_msg: {}".format(
991 repo_name, str(e)
992 )
993 )
994
995 return deleted_repo_list, added_repo_dict
996
997 except K8sException:
998 raise
999 except Exception as e:
1000 # Do not raise errors synchronizing repos
1001 self.log.error("Error synchronizing repos: {}".format(e))
1002 raise Exception("Error synchronizing repos: {}".format(e))
1003
1004 def _get_db_repos_dict(self, repo_ids: list):
1005 db_repos_dict = {}
1006 for repo_id in repo_ids:
1007 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1008 db_repos_dict[db_repo["name"]] = db_repo
1009 return db_repos_dict
1010
1011 """
1012 ####################################################################################
1013 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1014 ####################################################################################
1015 """
1016
1017 @abc.abstractmethod
1018 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1019 """
1020 Creates and returns base cluster and kube dirs and returns them.
1021 Also created helm3 dirs according to new directory specification, paths are
1022 not returned but assigned to helm environment variables
1023
1024 :param cluster_name: cluster_name
1025 :return: Dictionary with config_paths and dictionary with helm environment variables
1026 """
1027
1028 @abc.abstractmethod
1029 async def _cluster_init(self, cluster_id, namespace, paths, env):
1030 """
1031 Implements the helm version dependent cluster initialization
1032 """
1033
1034 @abc.abstractmethod
1035 async def _instances_list(self, cluster_id):
1036 """
1037 Implements the helm version dependent helm instances list
1038 """
1039
1040 @abc.abstractmethod
1041 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1042 """
1043 Implements the helm version dependent method to obtain services from a helm instance
1044 """
1045
1046 @abc.abstractmethod
1047 async def _status_kdu(
1048 self,
1049 cluster_id: str,
1050 kdu_instance: str,
1051 namespace: str = None,
1052 yaml_format: bool = False,
1053 show_error_log: bool = False,
1054 ) -> Union[str, dict]:
1055 """
1056 Implements the helm version dependent method to obtain status of a helm instance
1057 """
1058
1059 @abc.abstractmethod
1060 def _get_install_command(
1061 self,
1062 kdu_model,
1063 kdu_instance,
1064 namespace,
1065 params_str,
1066 version,
1067 atomic,
1068 timeout,
1069 kubeconfig,
1070 ) -> str:
1071 """
1072 Obtain command to be executed to delete the indicated instance
1073 """
1074
1075 @abc.abstractmethod
1076 def _get_upgrade_command(
1077 self,
1078 kdu_model,
1079 kdu_instance,
1080 namespace,
1081 params_str,
1082 version,
1083 atomic,
1084 timeout,
1085 kubeconfig,
1086 ) -> str:
1087 """
1088 Obtain command to be executed to upgrade the indicated instance
1089 """
1090
1091 @abc.abstractmethod
1092 def _get_rollback_command(
1093 self, kdu_instance, namespace, revision, kubeconfig
1094 ) -> str:
1095 """
1096 Obtain command to be executed to rollback the indicated instance
1097 """
1098
1099 @abc.abstractmethod
1100 def _get_uninstall_command(
1101 self, kdu_instance: str, namespace: str, kubeconfig: str
1102 ) -> str:
1103 """
1104 Obtain command to be executed to delete the indicated instance
1105 """
1106
1107 @abc.abstractmethod
1108 def _get_inspect_command(
1109 self, show_command: str, kdu_model: str, repo_str: str, version: str
1110 ):
1111 """
1112 Obtain command to be executed to obtain information about the kdu
1113 """
1114
1115 @abc.abstractmethod
1116 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1117 """
1118 Method call to uninstall cluster software for helm. This method is dependent
1119 of helm version
1120 For Helm v2 it will be called when Tiller must be uninstalled
1121 For Helm v3 it does nothing and does not need to be callled
1122 """
1123
1124 @abc.abstractmethod
1125 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1126 """
1127 Obtains the cluster repos identifiers
1128 """
1129
1130 """
1131 ####################################################################################
1132 ################################### P R I V A T E ##################################
1133 ####################################################################################
1134 """
1135
1136 @staticmethod
1137 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1138 if os.path.exists(filename):
1139 return True
1140 else:
1141 msg = "File {} does not exist".format(filename)
1142 if exception_if_not_exists:
1143 raise K8sException(msg)
1144
1145 @staticmethod
1146 def _remove_multiple_spaces(strobj):
1147 strobj = strobj.strip()
1148 while " " in strobj:
1149 strobj = strobj.replace(" ", " ")
1150 return strobj
1151
1152 @staticmethod
1153 def _output_to_lines(output: str) -> list:
1154 output_lines = list()
1155 lines = output.splitlines(keepends=False)
1156 for line in lines:
1157 line = line.strip()
1158 if len(line) > 0:
1159 output_lines.append(line)
1160 return output_lines
1161
1162 @staticmethod
1163 def _output_to_table(output: str) -> list:
1164 output_table = list()
1165 lines = output.splitlines(keepends=False)
1166 for line in lines:
1167 line = line.replace("\t", " ")
1168 line_list = list()
1169 output_table.append(line_list)
1170 cells = line.split(sep=" ")
1171 for cell in cells:
1172 cell = cell.strip()
1173 if len(cell) > 0:
1174 line_list.append(cell)
1175 return output_table
1176
1177 @staticmethod
1178 def _parse_services(output: str) -> list:
1179 lines = output.splitlines(keepends=False)
1180 services = []
1181 for line in lines:
1182 line = line.replace("\t", " ")
1183 cells = line.split(sep=" ")
1184 if len(cells) > 0 and cells[0].startswith("service/"):
1185 elems = cells[0].split(sep="/")
1186 if len(elems) > 1:
1187 services.append(elems[1])
1188 return services
1189
1190 @staticmethod
1191 def _get_deep(dictionary: dict, members: tuple):
1192 target = dictionary
1193 value = None
1194 try:
1195 for m in members:
1196 value = target.get(m)
1197 if not value:
1198 return None
1199 else:
1200 target = value
1201 except Exception:
1202 pass
1203 return value
1204
1205 # find key:value in several lines
1206 @staticmethod
1207 def _find_in_lines(p_lines: list, p_key: str) -> str:
1208 for line in p_lines:
1209 try:
1210 if line.startswith(p_key + ":"):
1211 parts = line.split(":")
1212 the_value = parts[1].strip()
1213 return the_value
1214 except Exception:
1215 # ignore it
1216 pass
1217 return None
1218
1219 @staticmethod
1220 def _lower_keys_list(input_list: list):
1221 """
1222 Transform the keys in a list of dictionaries to lower case and returns a new list
1223 of dictionaries
1224 """
1225 new_list = []
1226 if input_list:
1227 for dictionary in input_list:
1228 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1229 new_list.append(new_dict)
1230 return new_list
1231
1232 async def _local_async_exec(
1233 self,
1234 command: str,
1235 raise_exception_on_error: bool = False,
1236 show_error_log: bool = True,
1237 encode_utf8: bool = False,
1238 env: dict = None,
1239 ) -> (str, int):
1240
1241 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1242 self.log.debug(
1243 "Executing async local command: {}, env: {}".format(command, env)
1244 )
1245
1246 # split command
1247 command = shlex.split(command)
1248
1249 environ = os.environ.copy()
1250 if env:
1251 environ.update(env)
1252
1253 try:
1254 process = await asyncio.create_subprocess_exec(
1255 *command,
1256 stdout=asyncio.subprocess.PIPE,
1257 stderr=asyncio.subprocess.PIPE,
1258 env=environ,
1259 )
1260
1261 # wait for command terminate
1262 stdout, stderr = await process.communicate()
1263
1264 return_code = process.returncode
1265
1266 output = ""
1267 if stdout:
1268 output = stdout.decode("utf-8").strip()
1269 # output = stdout.decode()
1270 if stderr:
1271 output = stderr.decode("utf-8").strip()
1272 # output = stderr.decode()
1273
1274 if return_code != 0 and show_error_log:
1275 self.log.debug(
1276 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1277 )
1278 else:
1279 self.log.debug("Return code: {}".format(return_code))
1280
1281 if raise_exception_on_error and return_code != 0:
1282 raise K8sException(output)
1283
1284 if encode_utf8:
1285 output = output.encode("utf-8").strip()
1286 output = str(output).replace("\\n", "\n")
1287
1288 return output, return_code
1289
1290 except asyncio.CancelledError:
1291 raise
1292 except K8sException:
1293 raise
1294 except Exception as e:
1295 msg = "Exception executing command: {} -> {}".format(command, e)
1296 self.log.error(msg)
1297 if raise_exception_on_error:
1298 raise K8sException(e) from e
1299 else:
1300 return "", -1
1301
1302 async def _local_async_exec_pipe(
1303 self,
1304 command1: str,
1305 command2: str,
1306 raise_exception_on_error: bool = True,
1307 show_error_log: bool = True,
1308 encode_utf8: bool = False,
1309 env: dict = None,
1310 ):
1311
1312 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1313 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1314 command = "{} | {}".format(command1, command2)
1315 self.log.debug(
1316 "Executing async local command: {}, env: {}".format(command, env)
1317 )
1318
1319 # split command
1320 command1 = shlex.split(command1)
1321 command2 = shlex.split(command2)
1322
1323 environ = os.environ.copy()
1324 if env:
1325 environ.update(env)
1326
1327 try:
1328 read, write = os.pipe()
1329 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1330 os.close(write)
1331 process_2 = await asyncio.create_subprocess_exec(
1332 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1333 )
1334 os.close(read)
1335 stdout, stderr = await process_2.communicate()
1336
1337 return_code = process_2.returncode
1338
1339 output = ""
1340 if stdout:
1341 output = stdout.decode("utf-8").strip()
1342 # output = stdout.decode()
1343 if stderr:
1344 output = stderr.decode("utf-8").strip()
1345 # output = stderr.decode()
1346
1347 if return_code != 0 and show_error_log:
1348 self.log.debug(
1349 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1350 )
1351 else:
1352 self.log.debug("Return code: {}".format(return_code))
1353
1354 if raise_exception_on_error and return_code != 0:
1355 raise K8sException(output)
1356
1357 if encode_utf8:
1358 output = output.encode("utf-8").strip()
1359 output = str(output).replace("\\n", "\n")
1360
1361 return output, return_code
1362 except asyncio.CancelledError:
1363 raise
1364 except K8sException:
1365 raise
1366 except Exception as e:
1367 msg = "Exception executing command: {} -> {}".format(command, e)
1368 self.log.error(msg)
1369 if raise_exception_on_error:
1370 raise K8sException(e) from e
1371 else:
1372 return "", -1
1373
1374 async def _get_service(self, cluster_id, service_name, namespace):
1375 """
1376 Obtains the data of the specified service in the k8cluster.
1377
1378 :param cluster_id: id of a K8s cluster known by OSM
1379 :param service_name: name of the K8s service in the specified namespace
1380 :param namespace: K8s namespace used by the KDU instance
1381 :return: If successful, it will return a service with the following data:
1382 - `name` of the service
1383 - `type` type of service in the k8 cluster
1384 - `ports` List of ports offered by the service, for each port includes at least
1385 name, port, protocol
1386 - `cluster_ip` Internal ip to be used inside k8s cluster
1387 - `external_ip` List of external ips (in case they are available)
1388 """
1389
1390 # init config, env
1391 paths, env = self._init_paths_env(
1392 cluster_name=cluster_id, create_if_not_exist=True
1393 )
1394
1395 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1396 self.kubectl_command, paths["kube_config"], namespace, service_name
1397 )
1398
1399 output, _rc = await self._local_async_exec(
1400 command=command, raise_exception_on_error=True, env=env
1401 )
1402
1403 data = yaml.load(output, Loader=yaml.SafeLoader)
1404
1405 service = {
1406 "name": service_name,
1407 "type": self._get_deep(data, ("spec", "type")),
1408 "ports": self._get_deep(data, ("spec", "ports")),
1409 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1410 }
1411 if service["type"] == "LoadBalancer":
1412 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1413 ip_list = [elem["ip"] for elem in ip_map_list]
1414 service["external_ip"] = ip_list
1415
1416 return service
1417
1418 async def _exec_inspect_comand(
1419 self, inspect_command: str, kdu_model: str, repo_url: str = None
1420 ):
1421 """
1422 Obtains information about a kdu, no cluster (no env)
1423 """
1424
1425 repo_str = ""
1426 if repo_url:
1427 repo_str = " --repo {}".format(repo_url)
1428
1429 idx = kdu_model.find("/")
1430 if idx >= 0:
1431 idx += 1
1432 kdu_model = kdu_model[idx:]
1433
1434 version = ""
1435 if ":" in kdu_model:
1436 parts = kdu_model.split(sep=":")
1437 if len(parts) == 2:
1438 version = "--version {}".format(str(parts[1]))
1439 kdu_model = parts[0]
1440
1441 full_command = self._get_inspect_command(
1442 inspect_command, kdu_model, repo_str, version
1443 )
1444 output, _rc = await self._local_async_exec(
1445 command=full_command, encode_utf8=True
1446 )
1447
1448 return output
1449
1450 async def _store_status(
1451 self,
1452 cluster_id: str,
1453 operation: str,
1454 kdu_instance: str,
1455 namespace: str = None,
1456 db_dict: dict = None,
1457 ) -> None:
1458 """
1459 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1460
1461 :param cluster_id (str): the cluster where the KDU instance is deployed
1462 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1463 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1464 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1465 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1466 values for the keys:
1467 - "collection": The Mongo DB collection to write to
1468 - "filter": The query filter to use in the update process
1469 - "path": The dot separated keys which targets the object to be updated
1470 Defaults to None.
1471 """
1472
1473 try:
1474 detailed_status = await self._status_kdu(
1475 cluster_id=cluster_id,
1476 kdu_instance=kdu_instance,
1477 yaml_format=False,
1478 namespace=namespace,
1479 )
1480
1481 status = detailed_status.get("info").get("description")
1482 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1483
1484 # write status to db
1485 result = await self.write_app_status_to_db(
1486 db_dict=db_dict,
1487 status=str(status),
1488 detailed_status=str(detailed_status),
1489 operation=operation,
1490 )
1491
1492 if not result:
1493 self.log.info("Error writing in database. Task exiting...")
1494
1495 except asyncio.CancelledError as e:
1496 self.log.warning(
1497 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1498 )
1499 except Exception as e:
1500 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1501
1502 # params for use in -f file
1503 # returns values file option and filename (in order to delete it at the end)
1504 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1505
1506 if params and len(params) > 0:
1507 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1508
1509 def get_random_number():
1510 r = random.randrange(start=1, stop=99999999)
1511 s = str(r)
1512 while len(s) < 10:
1513 s = "0" + s
1514 return s
1515
1516 params2 = dict()
1517 for key in params:
1518 value = params.get(key)
1519 if "!!yaml" in str(value):
1520 value = yaml.load(value[7:])
1521 params2[key] = value
1522
1523 values_file = get_random_number() + ".yaml"
1524 with open(values_file, "w") as stream:
1525 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1526
1527 return "-f {}".format(values_file), values_file
1528
1529 return "", None
1530
1531 # params for use in --set option
1532 @staticmethod
1533 def _params_to_set_option(params: dict) -> str:
1534 params_str = ""
1535 if params and len(params) > 0:
1536 start = True
1537 for key in params:
1538 value = params.get(key, None)
1539 if value is not None:
1540 if start:
1541 params_str += "--set "
1542 start = False
1543 else:
1544 params_str += ","
1545 params_str += "{}={}".format(key, value)
1546 return params_str
1547
1548 @staticmethod
1549 def generate_kdu_instance_name(**kwargs):
1550 chart_name = kwargs["kdu_model"]
1551 # check embeded chart (file or dir)
1552 if chart_name.startswith("/"):
1553 # extract file or directory name
1554 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1555 # check URL
1556 elif "://" in chart_name:
1557 # extract last portion of URL
1558 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1559
1560 name = ""
1561 for c in chart_name:
1562 if c.isalpha() or c.isnumeric():
1563 name += c
1564 else:
1565 name += "-"
1566 if len(name) > 35:
1567 name = name[0:35]
1568
1569 # if does not start with alpha character, prefix 'a'
1570 if not name[0].isalpha():
1571 name = "a" + name
1572
1573 name += "-"
1574
1575 def get_random_number():
1576 r = random.randrange(start=1, stop=99999999)
1577 s = str(r)
1578 s = s.rjust(10, "0")
1579 return s
1580
1581 name = name + get_random_number()
1582 return name.lower()
1583
1584 async def _split_repo(self, kdu_model: str) -> str:
1585 repo_name = None
1586 idx = kdu_model.find("/")
1587 if idx >= 0:
1588 repo_name = kdu_model[:idx]
1589 return repo_name