Bug 2122 fixed: used cluster_id instead of cluster_uuid in repo_add.reverse_sync
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
101
102 async def init_env(
103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
126 else:
127 cluster_id = str(uuid4())
128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
129
130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
150 return cluster_uuid, n2vc_installed_sw
151
152 async def repo_add(
153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
154 ):
155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_id, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_id)
169
170 # helm repo add name url
171 command = "env KUBECONFIG={} {} repo add {} {}".format(
172 paths["kube_config"], self._helm_command, name, url
173 )
174 self.log.debug("adding repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=True, env=env
177 )
178
179 # helm repo update
180 command = "env KUBECONFIG={} {} repo update {}".format(
181 paths["kube_config"], self._helm_command, name
182 )
183 self.log.debug("updating repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=False, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_id)
190
191 async def repo_update(self, cluster_id: str, name: str, repo_type: str = "chart"):
192 self.log.debug(
193 "Cluster {}, updating {} repository {}".format(cluster_id, repo_type, name)
194 )
195
196 # init_env
197 paths, env = self._init_paths_env(
198 cluster_name=cluster_id, create_if_not_exist=True
199 )
200
201 # sync local dir
202 self.fs.sync(from_path=cluster_id)
203
204 # helm repo update
205 command = "{} repo update {}".format(self._helm_command, name)
206 self.log.debug("updating repo: {}".format(command))
207 await self._local_async_exec(
208 command=command, raise_exception_on_error=False, env=env
209 )
210
211 # sync fs
212 self.fs.reverse_sync(from_path=cluster_id)
213
214 async def repo_list(self, cluster_uuid: str) -> list:
215 """
216 Get the list of registered repositories
217
218 :return: list of registered repositories: [ (name, url) .... ]
219 """
220
221 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
222 self.log.debug("list repositories for cluster {}".format(cluster_id))
223
224 # config filename
225 paths, env = self._init_paths_env(
226 cluster_name=cluster_id, create_if_not_exist=True
227 )
228
229 # sync local dir
230 self.fs.sync(from_path=cluster_id)
231
232 command = "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths["kube_config"], self._helm_command
234 )
235
236 # Set exception to false because if there are no repos just want an empty list
237 output, _rc = await self._local_async_exec(
238 command=command, raise_exception_on_error=False, env=env
239 )
240
241 # sync fs
242 self.fs.reverse_sync(from_path=cluster_id)
243
244 if _rc == 0:
245 if output and len(output) > 0:
246 repos = yaml.load(output, Loader=yaml.SafeLoader)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self._lower_keys_list(repos)
249 else:
250 return []
251 else:
252 return []
253
254 async def repo_remove(self, cluster_uuid: str, name: str):
255
256 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
257 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
258
259 # init env, paths
260 paths, env = self._init_paths_env(
261 cluster_name=cluster_id, create_if_not_exist=True
262 )
263
264 # sync local dir
265 self.fs.sync(from_path=cluster_id)
266
267 command = "env KUBECONFIG={} {} repo remove {}".format(
268 paths["kube_config"], self._helm_command, name
269 )
270 await self._local_async_exec(
271 command=command, raise_exception_on_error=True, env=env
272 )
273
274 # sync fs
275 self.fs.reverse_sync(from_path=cluster_id)
276
277 async def reset(
278 self,
279 cluster_uuid: str,
280 force: bool = False,
281 uninstall_sw: bool = False,
282 **kwargs,
283 ) -> bool:
284 """Reset a cluster
285
286 Resets the Kubernetes cluster by removing the helm deployment that represents it.
287
288 :param cluster_uuid: The UUID of the cluster to reset
289 :param force: Boolean to force the reset
290 :param uninstall_sw: Boolean to force the reset
291 :param kwargs: Additional parameters (None yet)
292 :return: Returns True if successful or raises an exception.
293 """
294 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
295 self.log.debug(
296 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
297 cluster_id, uninstall_sw
298 )
299 )
300
301 # sync local dir
302 self.fs.sync(from_path=cluster_id)
303
304 # uninstall releases if needed.
305 if uninstall_sw:
306 releases = await self.instances_list(cluster_uuid=cluster_uuid)
307 if len(releases) > 0:
308 if force:
309 for r in releases:
310 try:
311 kdu_instance = r.get("name")
312 chart = r.get("chart")
313 self.log.debug(
314 "Uninstalling {} -> {}".format(chart, kdu_instance)
315 )
316 await self.uninstall(
317 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
318 )
319 except Exception as e:
320 # will not raise exception as it was found
321 # that in some cases of previously installed helm releases it
322 # raised an error
323 self.log.warn(
324 "Error uninstalling release {}: {}".format(
325 kdu_instance, e
326 )
327 )
328 else:
329 msg = (
330 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
331 ).format(cluster_id)
332 self.log.warn(msg)
333 uninstall_sw = (
334 False # Allow to remove k8s cluster without removing Tiller
335 )
336
337 if uninstall_sw:
338 await self._uninstall_sw(cluster_id, namespace)
339
340 # delete cluster directory
341 self.log.debug("Removing directory {}".format(cluster_id))
342 self.fs.file_delete(cluster_id, ignore_non_exist=True)
343 # Remove also local directorio if still exist
344 direct = self.fs.path + "/" + cluster_id
345 shutil.rmtree(direct, ignore_errors=True)
346
347 return True
348
349 async def _install_impl(
350 self,
351 cluster_id: str,
352 kdu_model: str,
353 paths: dict,
354 env: dict,
355 kdu_instance: str,
356 atomic: bool = True,
357 timeout: float = 300,
358 params: dict = None,
359 db_dict: dict = None,
360 kdu_name: str = None,
361 namespace: str = None,
362 ):
363 # init env, paths
364 paths, env = self._init_paths_env(
365 cluster_name=cluster_id, create_if_not_exist=True
366 )
367
368 # params to str
369 params_str, file_to_delete = self._params_to_file_option(
370 cluster_id=cluster_id, params=params
371 )
372
373 # version
374 version = None
375 if ":" in kdu_model:
376 parts = kdu_model.split(sep=":")
377 if len(parts) == 2:
378 version = str(parts[1])
379 kdu_model = parts[0]
380
381 repo = self._split_repo(kdu_model)
382 if repo:
383 self.repo_update(cluster_id, repo)
384
385 command = self._get_install_command(
386 kdu_model,
387 kdu_instance,
388 namespace,
389 params_str,
390 version,
391 atomic,
392 timeout,
393 paths["kube_config"],
394 )
395
396 self.log.debug("installing: {}".format(command))
397
398 if atomic:
399 # exec helm in a task
400 exec_task = asyncio.ensure_future(
401 coro_or_future=self._local_async_exec(
402 command=command, raise_exception_on_error=False, env=env
403 )
404 )
405
406 # write status in another task
407 status_task = asyncio.ensure_future(
408 coro_or_future=self._store_status(
409 cluster_id=cluster_id,
410 kdu_instance=kdu_instance,
411 namespace=namespace,
412 db_dict=db_dict,
413 operation="install",
414 )
415 )
416
417 # wait for execution task
418 await asyncio.wait([exec_task])
419
420 # cancel status task
421 status_task.cancel()
422
423 output, rc = exec_task.result()
424
425 else:
426
427 output, rc = await self._local_async_exec(
428 command=command, raise_exception_on_error=False, env=env
429 )
430
431 # remove temporal values yaml file
432 if file_to_delete:
433 os.remove(file_to_delete)
434
435 # write final status
436 await self._store_status(
437 cluster_id=cluster_id,
438 kdu_instance=kdu_instance,
439 namespace=namespace,
440 db_dict=db_dict,
441 operation="install",
442 )
443
444 if rc != 0:
445 msg = "Error executing command: {}\nOutput: {}".format(command, output)
446 self.log.error(msg)
447 raise K8sException(msg)
448
449 async def upgrade(
450 self,
451 cluster_uuid: str,
452 kdu_instance: str,
453 kdu_model: str = None,
454 atomic: bool = True,
455 timeout: float = 300,
456 params: dict = None,
457 db_dict: dict = None,
458 ):
459 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
460 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
461
462 # sync local dir
463 self.fs.sync(from_path=cluster_id)
464
465 # look for instance to obtain namespace
466 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
467 if not instance_info:
468 raise K8sException("kdu_instance {} not found".format(kdu_instance))
469
470 # init env, paths
471 paths, env = self._init_paths_env(
472 cluster_name=cluster_id, create_if_not_exist=True
473 )
474
475 # sync local dir
476 self.fs.sync(from_path=cluster_id)
477
478 # params to str
479 params_str, file_to_delete = self._params_to_file_option(
480 cluster_id=cluster_id, params=params
481 )
482
483 # version
484 version = None
485 if ":" in kdu_model:
486 parts = kdu_model.split(sep=":")
487 if len(parts) == 2:
488 version = str(parts[1])
489 kdu_model = parts[0]
490
491 repo = self._split_repo(kdu_model)
492 if repo:
493 self.repo_update(cluster_id, repo)
494
495 command = self._get_upgrade_command(
496 kdu_model,
497 kdu_instance,
498 instance_info["namespace"],
499 params_str,
500 version,
501 atomic,
502 timeout,
503 paths["kube_config"],
504 )
505
506 self.log.debug("upgrading: {}".format(command))
507
508 if atomic:
509
510 # exec helm in a task
511 exec_task = asyncio.ensure_future(
512 coro_or_future=self._local_async_exec(
513 command=command, raise_exception_on_error=False, env=env
514 )
515 )
516 # write status in another task
517 status_task = asyncio.ensure_future(
518 coro_or_future=self._store_status(
519 cluster_id=cluster_id,
520 kdu_instance=kdu_instance,
521 namespace=instance_info["namespace"],
522 db_dict=db_dict,
523 operation="upgrade",
524 )
525 )
526
527 # wait for execution task
528 await asyncio.wait([exec_task])
529
530 # cancel status task
531 status_task.cancel()
532 output, rc = exec_task.result()
533
534 else:
535
536 output, rc = await self._local_async_exec(
537 command=command, raise_exception_on_error=False, env=env
538 )
539
540 # remove temporal values yaml file
541 if file_to_delete:
542 os.remove(file_to_delete)
543
544 # write final status
545 await self._store_status(
546 cluster_id=cluster_id,
547 kdu_instance=kdu_instance,
548 namespace=instance_info["namespace"],
549 db_dict=db_dict,
550 operation="upgrade",
551 )
552
553 if rc != 0:
554 msg = "Error executing command: {}\nOutput: {}".format(command, output)
555 self.log.error(msg)
556 raise K8sException(msg)
557
558 # sync fs
559 self.fs.reverse_sync(from_path=cluster_id)
560
561 # return new revision number
562 instance = await self.get_instance_info(
563 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
564 )
565 if instance:
566 revision = int(instance.get("revision"))
567 self.log.debug("New revision: {}".format(revision))
568 return revision
569 else:
570 return 0
571
572 async def scale(
573 self,
574 kdu_instance: str,
575 scale: int,
576 resource_name: str,
577 total_timeout: float = 1800,
578 **kwargs,
579 ):
580 raise NotImplementedError("Method not implemented")
581
582 async def get_scale_count(
583 self,
584 resource_name: str,
585 kdu_instance: str,
586 **kwargs,
587 ):
588 raise NotImplementedError("Method not implemented")
589
590 async def rollback(
591 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
592 ):
593
594 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
595 self.log.debug(
596 "rollback kdu_instance {} to revision {} from cluster {}".format(
597 kdu_instance, revision, cluster_id
598 )
599 )
600
601 # sync local dir
602 self.fs.sync(from_path=cluster_id)
603
604 # look for instance to obtain namespace
605 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
606 if not instance_info:
607 raise K8sException("kdu_instance {} not found".format(kdu_instance))
608
609 # init env, paths
610 paths, env = self._init_paths_env(
611 cluster_name=cluster_id, create_if_not_exist=True
612 )
613
614 # sync local dir
615 self.fs.sync(from_path=cluster_id)
616
617 command = self._get_rollback_command(
618 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
619 )
620
621 self.log.debug("rolling_back: {}".format(command))
622
623 # exec helm in a task
624 exec_task = asyncio.ensure_future(
625 coro_or_future=self._local_async_exec(
626 command=command, raise_exception_on_error=False, env=env
627 )
628 )
629 # write status in another task
630 status_task = asyncio.ensure_future(
631 coro_or_future=self._store_status(
632 cluster_id=cluster_id,
633 kdu_instance=kdu_instance,
634 namespace=instance_info["namespace"],
635 db_dict=db_dict,
636 operation="rollback",
637 )
638 )
639
640 # wait for execution task
641 await asyncio.wait([exec_task])
642
643 # cancel status task
644 status_task.cancel()
645
646 output, rc = exec_task.result()
647
648 # write final status
649 await self._store_status(
650 cluster_id=cluster_id,
651 kdu_instance=kdu_instance,
652 namespace=instance_info["namespace"],
653 db_dict=db_dict,
654 operation="rollback",
655 )
656
657 if rc != 0:
658 msg = "Error executing command: {}\nOutput: {}".format(command, output)
659 self.log.error(msg)
660 raise K8sException(msg)
661
662 # sync fs
663 self.fs.reverse_sync(from_path=cluster_id)
664
665 # return new revision number
666 instance = await self.get_instance_info(
667 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
668 )
669 if instance:
670 revision = int(instance.get("revision"))
671 self.log.debug("New revision: {}".format(revision))
672 return revision
673 else:
674 return 0
675
676 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
677 """
678 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
679 (this call should happen after all _terminate-config-primitive_ of the VNF
680 are invoked).
681
682 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
683 :param kdu_instance: unique name for the KDU instance to be deleted
684 :param kwargs: Additional parameters (None yet)
685 :return: True if successful
686 """
687
688 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
689 self.log.debug(
690 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
691 )
692
693 # sync local dir
694 self.fs.sync(from_path=cluster_id)
695
696 # look for instance to obtain namespace
697 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
698 if not instance_info:
699 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
700 return True
701 # init env, paths
702 paths, env = self._init_paths_env(
703 cluster_name=cluster_id, create_if_not_exist=True
704 )
705
706 # sync local dir
707 self.fs.sync(from_path=cluster_id)
708
709 command = self._get_uninstall_command(
710 kdu_instance, instance_info["namespace"], paths["kube_config"]
711 )
712 output, _rc = await self._local_async_exec(
713 command=command, raise_exception_on_error=True, env=env
714 )
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_id)
718
719 return self._output_to_table(output)
720
721 async def instances_list(self, cluster_uuid: str) -> list:
722 """
723 returns a list of deployed releases in a cluster
724
725 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
726 :return:
727 """
728
729 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
730 self.log.debug("list releases for cluster {}".format(cluster_id))
731
732 # sync local dir
733 self.fs.sync(from_path=cluster_id)
734
735 # execute internal command
736 result = await self._instances_list(cluster_id)
737
738 # sync fs
739 self.fs.reverse_sync(from_path=cluster_id)
740
741 return result
742
743 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
744 instances = await self.instances_list(cluster_uuid=cluster_uuid)
745 for instance in instances:
746 if instance.get("name") == kdu_instance:
747 return instance
748 self.log.debug("Instance {} not found".format(kdu_instance))
749 return None
750
751 async def exec_primitive(
752 self,
753 cluster_uuid: str = None,
754 kdu_instance: str = None,
755 primitive_name: str = None,
756 timeout: float = 300,
757 params: dict = None,
758 db_dict: dict = None,
759 **kwargs,
760 ) -> str:
761 """Exec primitive (Juju action)
762
763 :param cluster_uuid: The UUID of the cluster or namespace:cluster
764 :param kdu_instance: The unique name of the KDU instance
765 :param primitive_name: Name of action that will be executed
766 :param timeout: Timeout for action execution
767 :param params: Dictionary of all the parameters needed for the action
768 :db_dict: Dictionary for any additional data
769 :param kwargs: Additional parameters (None yet)
770
771 :return: Returns the output of the action
772 """
773 raise K8sException(
774 "KDUs deployed with Helm don't support actions "
775 "different from rollback, upgrade and status"
776 )
777
778 async def get_services(
779 self, cluster_uuid: str, kdu_instance: str, namespace: str
780 ) -> list:
781 """
782 Returns a list of services defined for the specified kdu instance.
783
784 :param cluster_uuid: UUID of a K8s cluster known by OSM
785 :param kdu_instance: unique name for the KDU instance
786 :param namespace: K8s namespace used by the KDU instance
787 :return: If successful, it will return a list of services, Each service
788 can have the following data:
789 - `name` of the service
790 - `type` type of service in the k8 cluster
791 - `ports` List of ports offered by the service, for each port includes at least
792 name, port, protocol
793 - `cluster_ip` Internal ip to be used inside k8s cluster
794 - `external_ip` List of external ips (in case they are available)
795 """
796
797 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
798 self.log.debug(
799 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
800 cluster_uuid, kdu_instance
801 )
802 )
803
804 # init env, paths
805 paths, env = self._init_paths_env(
806 cluster_name=cluster_id, create_if_not_exist=True
807 )
808
809 # sync local dir
810 self.fs.sync(from_path=cluster_id)
811
812 # get list of services names for kdu
813 service_names = await self._get_services(
814 cluster_id, kdu_instance, namespace, paths["kube_config"]
815 )
816
817 service_list = []
818 for service in service_names:
819 service = await self._get_service(cluster_id, service, namespace)
820 service_list.append(service)
821
822 # sync fs
823 self.fs.reverse_sync(from_path=cluster_id)
824
825 return service_list
826
827 async def get_service(
828 self, cluster_uuid: str, service_name: str, namespace: str
829 ) -> object:
830
831 self.log.debug(
832 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
833 service_name, namespace, cluster_uuid
834 )
835 )
836
837 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
838
839 # sync local dir
840 self.fs.sync(from_path=cluster_id)
841
842 service = await self._get_service(cluster_id, service_name, namespace)
843
844 # sync fs
845 self.fs.reverse_sync(from_path=cluster_id)
846
847 return service
848
849 async def status_kdu(
850 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
851 ) -> Union[str, dict]:
852 """
853 This call would retrieve tha current state of a given KDU instance. It would be
854 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
855 values_ of the configuration parameters applied to a given instance. This call
856 would be based on the `status` call.
857
858 :param cluster_uuid: UUID of a K8s cluster known by OSM
859 :param kdu_instance: unique name for the KDU instance
860 :param kwargs: Additional parameters (None yet)
861 :param yaml_format: if the return shall be returned as an YAML string or as a
862 dictionary
863 :return: If successful, it will return the following vector of arguments:
864 - K8s `namespace` in the cluster where the KDU lives
865 - `state` of the KDU instance. It can be:
866 - UNKNOWN
867 - DEPLOYED
868 - DELETED
869 - SUPERSEDED
870 - FAILED or
871 - DELETING
872 - List of `resources` (objects) that this release consists of, sorted by kind,
873 and the status of those resources
874 - Last `deployment_time`.
875
876 """
877 self.log.debug(
878 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
879 cluster_uuid, kdu_instance
880 )
881 )
882
883 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
884
885 # sync local dir
886 self.fs.sync(from_path=cluster_id)
887
888 # get instance: needed to obtain namespace
889 instances = await self._instances_list(cluster_id=cluster_id)
890 for instance in instances:
891 if instance.get("name") == kdu_instance:
892 break
893 else:
894 # instance does not exist
895 raise K8sException(
896 "Instance name: {} not found in cluster: {}".format(
897 kdu_instance, cluster_id
898 )
899 )
900
901 status = await self._status_kdu(
902 cluster_id=cluster_id,
903 kdu_instance=kdu_instance,
904 namespace=instance["namespace"],
905 yaml_format=yaml_format,
906 show_error_log=True,
907 )
908
909 # sync fs
910 self.fs.reverse_sync(from_path=cluster_id)
911
912 return status
913
914 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
915
916 self.log.debug(
917 "inspect kdu_model values {} from (optional) repo: {}".format(
918 kdu_model, repo_url
919 )
920 )
921
922 return await self._exec_inspect_comand(
923 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
924 )
925
926 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
927
928 self.log.debug(
929 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
930 )
931
932 return await self._exec_inspect_comand(
933 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
934 )
935
936 async def synchronize_repos(self, cluster_uuid: str):
937
938 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
939 try:
940 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
941 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
942
943 local_repo_list = await self.repo_list(cluster_uuid)
944 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
945
946 deleted_repo_list = []
947 added_repo_dict = {}
948
949 # iterate over the list of repos in the database that should be
950 # added if not present
951 for repo_name, db_repo in db_repo_dict.items():
952 try:
953 # check if it is already present
954 curr_repo_url = local_repo_dict.get(db_repo["name"])
955 repo_id = db_repo.get("_id")
956 if curr_repo_url != db_repo["url"]:
957 if curr_repo_url:
958 self.log.debug(
959 "repo {} url changed, delete and and again".format(
960 db_repo["url"]
961 )
962 )
963 await self.repo_remove(cluster_uuid, db_repo["name"])
964 deleted_repo_list.append(repo_id)
965
966 # add repo
967 self.log.debug("add repo {}".format(db_repo["name"]))
968 await self.repo_add(
969 cluster_uuid, db_repo["name"], db_repo["url"]
970 )
971 added_repo_dict[repo_id] = db_repo["name"]
972 except Exception as e:
973 raise K8sException(
974 "Error adding repo id: {}, err_msg: {} ".format(
975 repo_id, repr(e)
976 )
977 )
978
979 # Delete repos that are present but not in nbi_list
980 for repo_name in local_repo_dict:
981 if not db_repo_dict.get(repo_name) and repo_name != "stable":
982 self.log.debug("delete repo {}".format(repo_name))
983 try:
984 await self.repo_remove(cluster_uuid, repo_name)
985 deleted_repo_list.append(repo_name)
986 except Exception as e:
987 self.warning(
988 "Error deleting repo, name: {}, err_msg: {}".format(
989 repo_name, str(e)
990 )
991 )
992
993 return deleted_repo_list, added_repo_dict
994
995 except K8sException:
996 raise
997 except Exception as e:
998 # Do not raise errors synchronizing repos
999 self.log.error("Error synchronizing repos: {}".format(e))
1000 raise Exception("Error synchronizing repos: {}".format(e))
1001
1002 def _get_db_repos_dict(self, repo_ids: list):
1003 db_repos_dict = {}
1004 for repo_id in repo_ids:
1005 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1006 db_repos_dict[db_repo["name"]] = db_repo
1007 return db_repos_dict
1008
1009 """
1010 ####################################################################################
1011 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1012 ####################################################################################
1013 """
1014
1015 @abc.abstractmethod
1016 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1017 """
1018 Creates and returns base cluster and kube dirs and returns them.
1019 Also created helm3 dirs according to new directory specification, paths are
1020 not returned but assigned to helm environment variables
1021
1022 :param cluster_name: cluster_name
1023 :return: Dictionary with config_paths and dictionary with helm environment variables
1024 """
1025
1026 @abc.abstractmethod
1027 async def _cluster_init(self, cluster_id, namespace, paths, env):
1028 """
1029 Implements the helm version dependent cluster initialization
1030 """
1031
1032 @abc.abstractmethod
1033 async def _instances_list(self, cluster_id):
1034 """
1035 Implements the helm version dependent helm instances list
1036 """
1037
1038 @abc.abstractmethod
1039 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1040 """
1041 Implements the helm version dependent method to obtain services from a helm instance
1042 """
1043
1044 @abc.abstractmethod
1045 async def _status_kdu(
1046 self,
1047 cluster_id: str,
1048 kdu_instance: str,
1049 namespace: str = None,
1050 yaml_format: bool = False,
1051 show_error_log: bool = False,
1052 ) -> Union[str, dict]:
1053 """
1054 Implements the helm version dependent method to obtain status of a helm instance
1055 """
1056
1057 @abc.abstractmethod
1058 def _get_install_command(
1059 self,
1060 kdu_model,
1061 kdu_instance,
1062 namespace,
1063 params_str,
1064 version,
1065 atomic,
1066 timeout,
1067 kubeconfig,
1068 ) -> str:
1069 """
1070 Obtain command to be executed to delete the indicated instance
1071 """
1072
1073 @abc.abstractmethod
1074 def _get_upgrade_command(
1075 self,
1076 kdu_model,
1077 kdu_instance,
1078 namespace,
1079 params_str,
1080 version,
1081 atomic,
1082 timeout,
1083 kubeconfig,
1084 ) -> str:
1085 """
1086 Obtain command to be executed to upgrade the indicated instance
1087 """
1088
1089 @abc.abstractmethod
1090 def _get_rollback_command(
1091 self, kdu_instance, namespace, revision, kubeconfig
1092 ) -> str:
1093 """
1094 Obtain command to be executed to rollback the indicated instance
1095 """
1096
1097 @abc.abstractmethod
1098 def _get_uninstall_command(
1099 self, kdu_instance: str, namespace: str, kubeconfig: str
1100 ) -> str:
1101 """
1102 Obtain command to be executed to delete the indicated instance
1103 """
1104
1105 @abc.abstractmethod
1106 def _get_inspect_command(
1107 self, show_command: str, kdu_model: str, repo_str: str, version: str
1108 ):
1109 """
1110 Obtain command to be executed to obtain information about the kdu
1111 """
1112
1113 @abc.abstractmethod
1114 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1115 """
1116 Method call to uninstall cluster software for helm. This method is dependent
1117 of helm version
1118 For Helm v2 it will be called when Tiller must be uninstalled
1119 For Helm v3 it does nothing and does not need to be callled
1120 """
1121
1122 @abc.abstractmethod
1123 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1124 """
1125 Obtains the cluster repos identifiers
1126 """
1127
1128 """
1129 ####################################################################################
1130 ################################### P R I V A T E ##################################
1131 ####################################################################################
1132 """
1133
1134 @staticmethod
1135 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1136 if os.path.exists(filename):
1137 return True
1138 else:
1139 msg = "File {} does not exist".format(filename)
1140 if exception_if_not_exists:
1141 raise K8sException(msg)
1142
1143 @staticmethod
1144 def _remove_multiple_spaces(strobj):
1145 strobj = strobj.strip()
1146 while " " in strobj:
1147 strobj = strobj.replace(" ", " ")
1148 return strobj
1149
1150 @staticmethod
1151 def _output_to_lines(output: str) -> list:
1152 output_lines = list()
1153 lines = output.splitlines(keepends=False)
1154 for line in lines:
1155 line = line.strip()
1156 if len(line) > 0:
1157 output_lines.append(line)
1158 return output_lines
1159
1160 @staticmethod
1161 def _output_to_table(output: str) -> list:
1162 output_table = list()
1163 lines = output.splitlines(keepends=False)
1164 for line in lines:
1165 line = line.replace("\t", " ")
1166 line_list = list()
1167 output_table.append(line_list)
1168 cells = line.split(sep=" ")
1169 for cell in cells:
1170 cell = cell.strip()
1171 if len(cell) > 0:
1172 line_list.append(cell)
1173 return output_table
1174
1175 @staticmethod
1176 def _parse_services(output: str) -> list:
1177 lines = output.splitlines(keepends=False)
1178 services = []
1179 for line in lines:
1180 line = line.replace("\t", " ")
1181 cells = line.split(sep=" ")
1182 if len(cells) > 0 and cells[0].startswith("service/"):
1183 elems = cells[0].split(sep="/")
1184 if len(elems) > 1:
1185 services.append(elems[1])
1186 return services
1187
1188 @staticmethod
1189 def _get_deep(dictionary: dict, members: tuple):
1190 target = dictionary
1191 value = None
1192 try:
1193 for m in members:
1194 value = target.get(m)
1195 if not value:
1196 return None
1197 else:
1198 target = value
1199 except Exception:
1200 pass
1201 return value
1202
1203 # find key:value in several lines
1204 @staticmethod
1205 def _find_in_lines(p_lines: list, p_key: str) -> str:
1206 for line in p_lines:
1207 try:
1208 if line.startswith(p_key + ":"):
1209 parts = line.split(":")
1210 the_value = parts[1].strip()
1211 return the_value
1212 except Exception:
1213 # ignore it
1214 pass
1215 return None
1216
1217 @staticmethod
1218 def _lower_keys_list(input_list: list):
1219 """
1220 Transform the keys in a list of dictionaries to lower case and returns a new list
1221 of dictionaries
1222 """
1223 new_list = []
1224 if input_list:
1225 for dictionary in input_list:
1226 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1227 new_list.append(new_dict)
1228 return new_list
1229
1230 async def _local_async_exec(
1231 self,
1232 command: str,
1233 raise_exception_on_error: bool = False,
1234 show_error_log: bool = True,
1235 encode_utf8: bool = False,
1236 env: dict = None,
1237 ) -> (str, int):
1238
1239 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1240 self.log.debug(
1241 "Executing async local command: {}, env: {}".format(command, env)
1242 )
1243
1244 # split command
1245 command = shlex.split(command)
1246
1247 environ = os.environ.copy()
1248 if env:
1249 environ.update(env)
1250
1251 try:
1252 process = await asyncio.create_subprocess_exec(
1253 *command,
1254 stdout=asyncio.subprocess.PIPE,
1255 stderr=asyncio.subprocess.PIPE,
1256 env=environ,
1257 )
1258
1259 # wait for command terminate
1260 stdout, stderr = await process.communicate()
1261
1262 return_code = process.returncode
1263
1264 output = ""
1265 if stdout:
1266 output = stdout.decode("utf-8").strip()
1267 # output = stdout.decode()
1268 if stderr:
1269 output = stderr.decode("utf-8").strip()
1270 # output = stderr.decode()
1271
1272 if return_code != 0 and show_error_log:
1273 self.log.debug(
1274 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1275 )
1276 else:
1277 self.log.debug("Return code: {}".format(return_code))
1278
1279 if raise_exception_on_error and return_code != 0:
1280 raise K8sException(output)
1281
1282 if encode_utf8:
1283 output = output.encode("utf-8").strip()
1284 output = str(output).replace("\\n", "\n")
1285
1286 return output, return_code
1287
1288 except asyncio.CancelledError:
1289 raise
1290 except K8sException:
1291 raise
1292 except Exception as e:
1293 msg = "Exception executing command: {} -> {}".format(command, e)
1294 self.log.error(msg)
1295 if raise_exception_on_error:
1296 raise K8sException(e) from e
1297 else:
1298 return "", -1
1299
1300 async def _local_async_exec_pipe(
1301 self,
1302 command1: str,
1303 command2: str,
1304 raise_exception_on_error: bool = True,
1305 show_error_log: bool = True,
1306 encode_utf8: bool = False,
1307 env: dict = None,
1308 ):
1309
1310 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1311 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1312 command = "{} | {}".format(command1, command2)
1313 self.log.debug(
1314 "Executing async local command: {}, env: {}".format(command, env)
1315 )
1316
1317 # split command
1318 command1 = shlex.split(command1)
1319 command2 = shlex.split(command2)
1320
1321 environ = os.environ.copy()
1322 if env:
1323 environ.update(env)
1324
1325 try:
1326 read, write = os.pipe()
1327 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1328 os.close(write)
1329 process_2 = await asyncio.create_subprocess_exec(
1330 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1331 )
1332 os.close(read)
1333 stdout, stderr = await process_2.communicate()
1334
1335 return_code = process_2.returncode
1336
1337 output = ""
1338 if stdout:
1339 output = stdout.decode("utf-8").strip()
1340 # output = stdout.decode()
1341 if stderr:
1342 output = stderr.decode("utf-8").strip()
1343 # output = stderr.decode()
1344
1345 if return_code != 0 and show_error_log:
1346 self.log.debug(
1347 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1348 )
1349 else:
1350 self.log.debug("Return code: {}".format(return_code))
1351
1352 if raise_exception_on_error and return_code != 0:
1353 raise K8sException(output)
1354
1355 if encode_utf8:
1356 output = output.encode("utf-8").strip()
1357 output = str(output).replace("\\n", "\n")
1358
1359 return output, return_code
1360 except asyncio.CancelledError:
1361 raise
1362 except K8sException:
1363 raise
1364 except Exception as e:
1365 msg = "Exception executing command: {} -> {}".format(command, e)
1366 self.log.error(msg)
1367 if raise_exception_on_error:
1368 raise K8sException(e) from e
1369 else:
1370 return "", -1
1371
1372 async def _get_service(self, cluster_id, service_name, namespace):
1373 """
1374 Obtains the data of the specified service in the k8cluster.
1375
1376 :param cluster_id: id of a K8s cluster known by OSM
1377 :param service_name: name of the K8s service in the specified namespace
1378 :param namespace: K8s namespace used by the KDU instance
1379 :return: If successful, it will return a service with the following data:
1380 - `name` of the service
1381 - `type` type of service in the k8 cluster
1382 - `ports` List of ports offered by the service, for each port includes at least
1383 name, port, protocol
1384 - `cluster_ip` Internal ip to be used inside k8s cluster
1385 - `external_ip` List of external ips (in case they are available)
1386 """
1387
1388 # init config, env
1389 paths, env = self._init_paths_env(
1390 cluster_name=cluster_id, create_if_not_exist=True
1391 )
1392
1393 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1394 self.kubectl_command, paths["kube_config"], namespace, service_name
1395 )
1396
1397 output, _rc = await self._local_async_exec(
1398 command=command, raise_exception_on_error=True, env=env
1399 )
1400
1401 data = yaml.load(output, Loader=yaml.SafeLoader)
1402
1403 service = {
1404 "name": service_name,
1405 "type": self._get_deep(data, ("spec", "type")),
1406 "ports": self._get_deep(data, ("spec", "ports")),
1407 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1408 }
1409 if service["type"] == "LoadBalancer":
1410 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1411 ip_list = [elem["ip"] for elem in ip_map_list]
1412 service["external_ip"] = ip_list
1413
1414 return service
1415
1416 async def _exec_inspect_comand(
1417 self, inspect_command: str, kdu_model: str, repo_url: str = None
1418 ):
1419 """
1420 Obtains information about a kdu, no cluster (no env)
1421 """
1422
1423 repo_str = ""
1424 if repo_url:
1425 repo_str = " --repo {}".format(repo_url)
1426
1427 idx = kdu_model.find("/")
1428 if idx >= 0:
1429 idx += 1
1430 kdu_model = kdu_model[idx:]
1431
1432 version = ""
1433 if ":" in kdu_model:
1434 parts = kdu_model.split(sep=":")
1435 if len(parts) == 2:
1436 version = "--version {}".format(str(parts[1]))
1437 kdu_model = parts[0]
1438
1439 full_command = self._get_inspect_command(
1440 inspect_command, kdu_model, repo_str, version
1441 )
1442 output, _rc = await self._local_async_exec(
1443 command=full_command, encode_utf8=True
1444 )
1445
1446 return output
1447
1448 async def _store_status(
1449 self,
1450 cluster_id: str,
1451 operation: str,
1452 kdu_instance: str,
1453 namespace: str = None,
1454 db_dict: dict = None,
1455 ) -> None:
1456 """
1457 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1458
1459 :param cluster_id (str): the cluster where the KDU instance is deployed
1460 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1461 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1462 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1463 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1464 values for the keys:
1465 - "collection": The Mongo DB collection to write to
1466 - "filter": The query filter to use in the update process
1467 - "path": The dot separated keys which targets the object to be updated
1468 Defaults to None.
1469 """
1470
1471 try:
1472 detailed_status = await self._status_kdu(
1473 cluster_id=cluster_id,
1474 kdu_instance=kdu_instance,
1475 yaml_format=False,
1476 namespace=namespace,
1477 )
1478
1479 status = detailed_status.get("info").get("description")
1480 self.log.debug(f"Status for KDU {kdu_instance} obtained: {status}.")
1481
1482 # write status to db
1483 result = await self.write_app_status_to_db(
1484 db_dict=db_dict,
1485 status=str(status),
1486 detailed_status=str(detailed_status),
1487 operation=operation,
1488 )
1489
1490 if not result:
1491 self.log.info("Error writing in database. Task exiting...")
1492
1493 except asyncio.CancelledError as e:
1494 self.log.warning(
1495 f"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1496 )
1497 except Exception as e:
1498 self.log.warning(f"Exception in method {self._store_status.__name__}: {e}")
1499
1500 # params for use in -f file
1501 # returns values file option and filename (in order to delete it at the end)
1502 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1503
1504 if params and len(params) > 0:
1505 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1506
1507 def get_random_number():
1508 r = random.randrange(start=1, stop=99999999)
1509 s = str(r)
1510 while len(s) < 10:
1511 s = "0" + s
1512 return s
1513
1514 params2 = dict()
1515 for key in params:
1516 value = params.get(key)
1517 if "!!yaml" in str(value):
1518 value = yaml.load(value[7:])
1519 params2[key] = value
1520
1521 values_file = get_random_number() + ".yaml"
1522 with open(values_file, "w") as stream:
1523 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1524
1525 return "-f {}".format(values_file), values_file
1526
1527 return "", None
1528
1529 # params for use in --set option
1530 @staticmethod
1531 def _params_to_set_option(params: dict) -> str:
1532 params_str = ""
1533 if params and len(params) > 0:
1534 start = True
1535 for key in params:
1536 value = params.get(key, None)
1537 if value is not None:
1538 if start:
1539 params_str += "--set "
1540 start = False
1541 else:
1542 params_str += ","
1543 params_str += "{}={}".format(key, value)
1544 return params_str
1545
1546 @staticmethod
1547 def generate_kdu_instance_name(**kwargs):
1548 chart_name = kwargs["kdu_model"]
1549 # check embeded chart (file or dir)
1550 if chart_name.startswith("/"):
1551 # extract file or directory name
1552 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1553 # check URL
1554 elif "://" in chart_name:
1555 # extract last portion of URL
1556 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1557
1558 name = ""
1559 for c in chart_name:
1560 if c.isalpha() or c.isnumeric():
1561 name += c
1562 else:
1563 name += "-"
1564 if len(name) > 35:
1565 name = name[0:35]
1566
1567 # if does not start with alpha character, prefix 'a'
1568 if not name[0].isalpha():
1569 name = "a" + name
1570
1571 name += "-"
1572
1573 def get_random_number():
1574 r = random.randrange(start=1, stop=99999999)
1575 s = str(r)
1576 s = s.rjust(10, "0")
1577 return s
1578
1579 name = name + get_random_number()
1580 return name.lower()
1581
1582 async def _split_repo(self, kdu_model: str) -> str:
1583 repo_name = None
1584 idx = kdu_model.find("/")
1585 if idx >= 0:
1586 repo_name = kdu_model[:idx]
1587 return repo_name