Revert "Bug 1964 fixed: removed the variable cluster_uuid from init_env method"
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 from typing import Union
25 import random
26 import time
27 import shlex
28 import shutil
29 import stat
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
101
102 async def init_env(
103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
126 else:
127 cluster_id = str(uuid4())
128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
129
130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
150 return cluster_uuid, n2vc_installed_sw
151
152 async def repo_add(
153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
154 ):
155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_id, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_id)
169
170 # helm repo add name url
171 command = "env KUBECONFIG={} {} repo add {} {}".format(
172 paths["kube_config"], self._helm_command, name, url
173 )
174 self.log.debug("adding repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=True, env=env
177 )
178
179 # helm repo update
180 command = "env KUBECONFIG={} {} repo update {}".format(
181 paths["kube_config"], self._helm_command, name
182 )
183 self.log.debug("updating repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=False, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_uuid)
190
191 async def repo_update(self, cluster_uuid: str, name: str, repo_type: str = "chart"):
192 self.log.debug(
193 "Cluster {}, updating {} repository {}".format(
194 cluster_uuid, repo_type, name
195 )
196 )
197
198 # init_env
199 paths, env = self._init_paths_env(
200 cluster_name=cluster_uuid, create_if_not_exist=True
201 )
202
203 # sync local dir
204 self.fs.sync(from_path=cluster_uuid)
205
206 # helm repo update
207 command = "{} repo update {}".format(self._helm_command, name)
208 self.log.debug("updating repo: {}".format(command))
209 await self._local_async_exec(
210 command=command, raise_exception_on_error=False, env=env
211 )
212
213 # sync fs
214 self.fs.reverse_sync(from_path=cluster_uuid)
215
216 async def repo_list(self, cluster_uuid: str) -> list:
217 """
218 Get the list of registered repositories
219
220 :return: list of registered repositories: [ (name, url) .... ]
221 """
222
223 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
224 self.log.debug("list repositories for cluster {}".format(cluster_id))
225
226 # config filename
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_id, create_if_not_exist=True
229 )
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_id)
233
234 command = "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths["kube_config"], self._helm_command
236 )
237
238 # Set exception to false because if there are no repos just want an empty list
239 output, _rc = await self._local_async_exec(
240 command=command, raise_exception_on_error=False, env=env
241 )
242
243 # sync fs
244 self.fs.reverse_sync(from_path=cluster_id)
245
246 if _rc == 0:
247 if output and len(output) > 0:
248 repos = yaml.load(output, Loader=yaml.SafeLoader)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self._lower_keys_list(repos)
251 else:
252 return []
253 else:
254 return []
255
256 async def repo_remove(self, cluster_uuid: str, name: str):
257
258 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
259 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
260
261 # init env, paths
262 paths, env = self._init_paths_env(
263 cluster_name=cluster_id, create_if_not_exist=True
264 )
265
266 # sync local dir
267 self.fs.sync(from_path=cluster_id)
268
269 command = "env KUBECONFIG={} {} repo remove {}".format(
270 paths["kube_config"], self._helm_command, name
271 )
272 await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
274 )
275
276 # sync fs
277 self.fs.reverse_sync(from_path=cluster_id)
278
279 async def reset(
280 self,
281 cluster_uuid: str,
282 force: bool = False,
283 uninstall_sw: bool = False,
284 **kwargs,
285 ) -> bool:
286 """Reset a cluster
287
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
289
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
295 """
296 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
297 self.log.debug(
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_id, uninstall_sw
300 )
301 )
302
303 # sync local dir
304 self.fs.sync(from_path=cluster_id)
305
306 # uninstall releases if needed.
307 if uninstall_sw:
308 releases = await self.instances_list(cluster_uuid=cluster_uuid)
309 if len(releases) > 0:
310 if force:
311 for r in releases:
312 try:
313 kdu_instance = r.get("name")
314 chart = r.get("chart")
315 self.log.debug(
316 "Uninstalling {} -> {}".format(chart, kdu_instance)
317 )
318 await self.uninstall(
319 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
320 )
321 except Exception as e:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
324 # raised an error
325 self.log.warn(
326 "Error uninstalling release {}: {}".format(
327 kdu_instance, e
328 )
329 )
330 else:
331 msg = (
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 ).format(cluster_id)
334 self.log.warn(msg)
335 uninstall_sw = (
336 False # Allow to remove k8s cluster without removing Tiller
337 )
338
339 if uninstall_sw:
340 await self._uninstall_sw(cluster_id, namespace)
341
342 # delete cluster directory
343 self.log.debug("Removing directory {}".format(cluster_id))
344 self.fs.file_delete(cluster_id, ignore_non_exist=True)
345 # Remove also local directorio if still exist
346 direct = self.fs.path + "/" + cluster_id
347 shutil.rmtree(direct, ignore_errors=True)
348
349 return True
350
351 async def _install_impl(
352 self,
353 cluster_id: str,
354 kdu_model: str,
355 paths: dict,
356 env: dict,
357 kdu_instance: str,
358 atomic: bool = True,
359 timeout: float = 300,
360 params: dict = None,
361 db_dict: dict = None,
362 kdu_name: str = None,
363 namespace: str = None,
364 ):
365 # init env, paths
366 paths, env = self._init_paths_env(
367 cluster_name=cluster_id, create_if_not_exist=True
368 )
369
370 # params to str
371 params_str, file_to_delete = self._params_to_file_option(
372 cluster_id=cluster_id, params=params
373 )
374
375 # version
376 version = None
377 if ":" in kdu_model:
378 parts = kdu_model.split(sep=":")
379 if len(parts) == 2:
380 version = str(parts[1])
381 kdu_model = parts[0]
382
383 repo = self._split_repo(kdu_model)
384 if repo:
385 self.repo_update(cluster_id, repo)
386
387 command = self._get_install_command(
388 kdu_model,
389 kdu_instance,
390 namespace,
391 params_str,
392 version,
393 atomic,
394 timeout,
395 paths["kube_config"],
396 )
397
398 self.log.debug("installing: {}".format(command))
399
400 if atomic:
401 # exec helm in a task
402 exec_task = asyncio.ensure_future(
403 coro_or_future=self._local_async_exec(
404 command=command, raise_exception_on_error=False, env=env
405 )
406 )
407
408 # write status in another task
409 status_task = asyncio.ensure_future(
410 coro_or_future=self._store_status(
411 cluster_id=cluster_id,
412 kdu_instance=kdu_instance,
413 namespace=namespace,
414 db_dict=db_dict,
415 operation="install",
416 run_once=False,
417 )
418 )
419
420 # wait for execution task
421 await asyncio.wait([exec_task])
422
423 # cancel status task
424 status_task.cancel()
425
426 output, rc = exec_task.result()
427
428 else:
429
430 output, rc = await self._local_async_exec(
431 command=command, raise_exception_on_error=False, env=env
432 )
433
434 # remove temporal values yaml file
435 if file_to_delete:
436 os.remove(file_to_delete)
437
438 # write final status
439 await self._store_status(
440 cluster_id=cluster_id,
441 kdu_instance=kdu_instance,
442 namespace=namespace,
443 db_dict=db_dict,
444 operation="install",
445 run_once=True,
446 check_every=0,
447 )
448
449 if rc != 0:
450 msg = "Error executing command: {}\nOutput: {}".format(command, output)
451 self.log.error(msg)
452 raise K8sException(msg)
453
454 async def upgrade(
455 self,
456 cluster_uuid: str,
457 kdu_instance: str,
458 kdu_model: str = None,
459 atomic: bool = True,
460 timeout: float = 300,
461 params: dict = None,
462 db_dict: dict = None,
463 ):
464 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
465 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
466
467 # sync local dir
468 self.fs.sync(from_path=cluster_id)
469
470 # look for instance to obtain namespace
471 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
472 if not instance_info:
473 raise K8sException("kdu_instance {} not found".format(kdu_instance))
474
475 # init env, paths
476 paths, env = self._init_paths_env(
477 cluster_name=cluster_id, create_if_not_exist=True
478 )
479
480 # sync local dir
481 self.fs.sync(from_path=cluster_id)
482
483 # params to str
484 params_str, file_to_delete = self._params_to_file_option(
485 cluster_id=cluster_id, params=params
486 )
487
488 # version
489 version = None
490 if ":" in kdu_model:
491 parts = kdu_model.split(sep=":")
492 if len(parts) == 2:
493 version = str(parts[1])
494 kdu_model = parts[0]
495
496 repo = self._split_repo(kdu_model)
497 if repo:
498 self.repo_update(cluster_uuid, repo)
499
500 command = self._get_upgrade_command(
501 kdu_model,
502 kdu_instance,
503 instance_info["namespace"],
504 params_str,
505 version,
506 atomic,
507 timeout,
508 paths["kube_config"],
509 )
510
511 self.log.debug("upgrading: {}".format(command))
512
513 if atomic:
514
515 # exec helm in a task
516 exec_task = asyncio.ensure_future(
517 coro_or_future=self._local_async_exec(
518 command=command, raise_exception_on_error=False, env=env
519 )
520 )
521 # write status in another task
522 status_task = asyncio.ensure_future(
523 coro_or_future=self._store_status(
524 cluster_id=cluster_id,
525 kdu_instance=kdu_instance,
526 namespace=instance_info["namespace"],
527 db_dict=db_dict,
528 operation="upgrade",
529 run_once=False,
530 )
531 )
532
533 # wait for execution task
534 await asyncio.wait([exec_task])
535
536 # cancel status task
537 status_task.cancel()
538 output, rc = exec_task.result()
539
540 else:
541
542 output, rc = await self._local_async_exec(
543 command=command, raise_exception_on_error=False, env=env
544 )
545
546 # remove temporal values yaml file
547 if file_to_delete:
548 os.remove(file_to_delete)
549
550 # write final status
551 await self._store_status(
552 cluster_id=cluster_id,
553 kdu_instance=kdu_instance,
554 namespace=instance_info["namespace"],
555 db_dict=db_dict,
556 operation="upgrade",
557 run_once=True,
558 check_every=0,
559 )
560
561 if rc != 0:
562 msg = "Error executing command: {}\nOutput: {}".format(command, output)
563 self.log.error(msg)
564 raise K8sException(msg)
565
566 # sync fs
567 self.fs.reverse_sync(from_path=cluster_id)
568
569 # return new revision number
570 instance = await self.get_instance_info(
571 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
572 )
573 if instance:
574 revision = int(instance.get("revision"))
575 self.log.debug("New revision: {}".format(revision))
576 return revision
577 else:
578 return 0
579
580 async def scale(
581 self,
582 kdu_instance: str,
583 scale: int,
584 resource_name: str,
585 total_timeout: float = 1800,
586 **kwargs,
587 ):
588 raise NotImplementedError("Method not implemented")
589
590 async def get_scale_count(
591 self,
592 resource_name: str,
593 kdu_instance: str,
594 **kwargs,
595 ):
596 raise NotImplementedError("Method not implemented")
597
598 async def rollback(
599 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
600 ):
601
602 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
603 self.log.debug(
604 "rollback kdu_instance {} to revision {} from cluster {}".format(
605 kdu_instance, revision, cluster_id
606 )
607 )
608
609 # sync local dir
610 self.fs.sync(from_path=cluster_id)
611
612 # look for instance to obtain namespace
613 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
614 if not instance_info:
615 raise K8sException("kdu_instance {} not found".format(kdu_instance))
616
617 # init env, paths
618 paths, env = self._init_paths_env(
619 cluster_name=cluster_id, create_if_not_exist=True
620 )
621
622 # sync local dir
623 self.fs.sync(from_path=cluster_id)
624
625 command = self._get_rollback_command(
626 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
627 )
628
629 self.log.debug("rolling_back: {}".format(command))
630
631 # exec helm in a task
632 exec_task = asyncio.ensure_future(
633 coro_or_future=self._local_async_exec(
634 command=command, raise_exception_on_error=False, env=env
635 )
636 )
637 # write status in another task
638 status_task = asyncio.ensure_future(
639 coro_or_future=self._store_status(
640 cluster_id=cluster_id,
641 kdu_instance=kdu_instance,
642 namespace=instance_info["namespace"],
643 db_dict=db_dict,
644 operation="rollback",
645 run_once=False,
646 )
647 )
648
649 # wait for execution task
650 await asyncio.wait([exec_task])
651
652 # cancel status task
653 status_task.cancel()
654
655 output, rc = exec_task.result()
656
657 # write final status
658 await self._store_status(
659 cluster_id=cluster_id,
660 kdu_instance=kdu_instance,
661 namespace=instance_info["namespace"],
662 db_dict=db_dict,
663 operation="rollback",
664 run_once=True,
665 check_every=0,
666 )
667
668 if rc != 0:
669 msg = "Error executing command: {}\nOutput: {}".format(command, output)
670 self.log.error(msg)
671 raise K8sException(msg)
672
673 # sync fs
674 self.fs.reverse_sync(from_path=cluster_id)
675
676 # return new revision number
677 instance = await self.get_instance_info(
678 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
679 )
680 if instance:
681 revision = int(instance.get("revision"))
682 self.log.debug("New revision: {}".format(revision))
683 return revision
684 else:
685 return 0
686
687 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
688 """
689 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
690 (this call should happen after all _terminate-config-primitive_ of the VNF
691 are invoked).
692
693 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
694 :param kdu_instance: unique name for the KDU instance to be deleted
695 :param kwargs: Additional parameters (None yet)
696 :return: True if successful
697 """
698
699 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
700 self.log.debug(
701 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
702 )
703
704 # sync local dir
705 self.fs.sync(from_path=cluster_id)
706
707 # look for instance to obtain namespace
708 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
709 if not instance_info:
710 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
711 return True
712 # init env, paths
713 paths, env = self._init_paths_env(
714 cluster_name=cluster_id, create_if_not_exist=True
715 )
716
717 # sync local dir
718 self.fs.sync(from_path=cluster_id)
719
720 command = self._get_uninstall_command(
721 kdu_instance, instance_info["namespace"], paths["kube_config"]
722 )
723 output, _rc = await self._local_async_exec(
724 command=command, raise_exception_on_error=True, env=env
725 )
726
727 # sync fs
728 self.fs.reverse_sync(from_path=cluster_id)
729
730 return self._output_to_table(output)
731
732 async def instances_list(self, cluster_uuid: str) -> list:
733 """
734 returns a list of deployed releases in a cluster
735
736 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
737 :return:
738 """
739
740 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
741 self.log.debug("list releases for cluster {}".format(cluster_id))
742
743 # sync local dir
744 self.fs.sync(from_path=cluster_id)
745
746 # execute internal command
747 result = await self._instances_list(cluster_id)
748
749 # sync fs
750 self.fs.reverse_sync(from_path=cluster_id)
751
752 return result
753
754 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
755 instances = await self.instances_list(cluster_uuid=cluster_uuid)
756 for instance in instances:
757 if instance.get("name") == kdu_instance:
758 return instance
759 self.log.debug("Instance {} not found".format(kdu_instance))
760 return None
761
762 async def exec_primitive(
763 self,
764 cluster_uuid: str = None,
765 kdu_instance: str = None,
766 primitive_name: str = None,
767 timeout: float = 300,
768 params: dict = None,
769 db_dict: dict = None,
770 **kwargs,
771 ) -> str:
772 """Exec primitive (Juju action)
773
774 :param cluster_uuid: The UUID of the cluster or namespace:cluster
775 :param kdu_instance: The unique name of the KDU instance
776 :param primitive_name: Name of action that will be executed
777 :param timeout: Timeout for action execution
778 :param params: Dictionary of all the parameters needed for the action
779 :db_dict: Dictionary for any additional data
780 :param kwargs: Additional parameters (None yet)
781
782 :return: Returns the output of the action
783 """
784 raise K8sException(
785 "KDUs deployed with Helm don't support actions "
786 "different from rollback, upgrade and status"
787 )
788
789 async def get_services(
790 self, cluster_uuid: str, kdu_instance: str, namespace: str
791 ) -> list:
792 """
793 Returns a list of services defined for the specified kdu instance.
794
795 :param cluster_uuid: UUID of a K8s cluster known by OSM
796 :param kdu_instance: unique name for the KDU instance
797 :param namespace: K8s namespace used by the KDU instance
798 :return: If successful, it will return a list of services, Each service
799 can have the following data:
800 - `name` of the service
801 - `type` type of service in the k8 cluster
802 - `ports` List of ports offered by the service, for each port includes at least
803 name, port, protocol
804 - `cluster_ip` Internal ip to be used inside k8s cluster
805 - `external_ip` List of external ips (in case they are available)
806 """
807
808 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
809 self.log.debug(
810 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
811 cluster_uuid, kdu_instance
812 )
813 )
814
815 # init env, paths
816 paths, env = self._init_paths_env(
817 cluster_name=cluster_id, create_if_not_exist=True
818 )
819
820 # sync local dir
821 self.fs.sync(from_path=cluster_id)
822
823 # get list of services names for kdu
824 service_names = await self._get_services(
825 cluster_id, kdu_instance, namespace, paths["kube_config"]
826 )
827
828 service_list = []
829 for service in service_names:
830 service = await self._get_service(cluster_id, service, namespace)
831 service_list.append(service)
832
833 # sync fs
834 self.fs.reverse_sync(from_path=cluster_id)
835
836 return service_list
837
838 async def get_service(
839 self, cluster_uuid: str, service_name: str, namespace: str
840 ) -> object:
841
842 self.log.debug(
843 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
844 service_name, namespace, cluster_uuid
845 )
846 )
847
848 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
849
850 # sync local dir
851 self.fs.sync(from_path=cluster_id)
852
853 service = await self._get_service(cluster_id, service_name, namespace)
854
855 # sync fs
856 self.fs.reverse_sync(from_path=cluster_id)
857
858 return service
859
860 async def status_kdu(
861 self, cluster_uuid: str, kdu_instance: str, yaml_format: str = False, **kwargs
862 ) -> Union[str, dict]:
863 """
864 This call would retrieve tha current state of a given KDU instance. It would be
865 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
866 values_ of the configuration parameters applied to a given instance. This call
867 would be based on the `status` call.
868
869 :param cluster_uuid: UUID of a K8s cluster known by OSM
870 :param kdu_instance: unique name for the KDU instance
871 :param kwargs: Additional parameters (None yet)
872 :param yaml_format: if the return shall be returned as an YAML string or as a
873 dictionary
874 :return: If successful, it will return the following vector of arguments:
875 - K8s `namespace` in the cluster where the KDU lives
876 - `state` of the KDU instance. It can be:
877 - UNKNOWN
878 - DEPLOYED
879 - DELETED
880 - SUPERSEDED
881 - FAILED or
882 - DELETING
883 - List of `resources` (objects) that this release consists of, sorted by kind,
884 and the status of those resources
885 - Last `deployment_time`.
886
887 """
888 self.log.debug(
889 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
890 cluster_uuid, kdu_instance
891 )
892 )
893
894 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
895
896 # sync local dir
897 self.fs.sync(from_path=cluster_id)
898
899 # get instance: needed to obtain namespace
900 instances = await self._instances_list(cluster_id=cluster_id)
901 for instance in instances:
902 if instance.get("name") == kdu_instance:
903 break
904 else:
905 # instance does not exist
906 raise K8sException(
907 "Instance name: {} not found in cluster: {}".format(
908 kdu_instance, cluster_id
909 )
910 )
911
912 status = await self._status_kdu(
913 cluster_id=cluster_id,
914 kdu_instance=kdu_instance,
915 namespace=instance["namespace"],
916 yaml_format=yaml_format,
917 show_error_log=True,
918 )
919
920 # sync fs
921 self.fs.reverse_sync(from_path=cluster_id)
922
923 return status
924
925 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
926
927 self.log.debug(
928 "inspect kdu_model values {} from (optional) repo: {}".format(
929 kdu_model, repo_url
930 )
931 )
932
933 return await self._exec_inspect_comand(
934 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
935 )
936
937 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
938
939 self.log.debug(
940 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
941 )
942
943 return await self._exec_inspect_comand(
944 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
945 )
946
947 async def synchronize_repos(self, cluster_uuid: str):
948
949 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
950 try:
951 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
952 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
953
954 local_repo_list = await self.repo_list(cluster_uuid)
955 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
956
957 deleted_repo_list = []
958 added_repo_dict = {}
959
960 # iterate over the list of repos in the database that should be
961 # added if not present
962 for repo_name, db_repo in db_repo_dict.items():
963 try:
964 # check if it is already present
965 curr_repo_url = local_repo_dict.get(db_repo["name"])
966 repo_id = db_repo.get("_id")
967 if curr_repo_url != db_repo["url"]:
968 if curr_repo_url:
969 self.log.debug(
970 "repo {} url changed, delete and and again".format(
971 db_repo["url"]
972 )
973 )
974 await self.repo_remove(cluster_uuid, db_repo["name"])
975 deleted_repo_list.append(repo_id)
976
977 # add repo
978 self.log.debug("add repo {}".format(db_repo["name"]))
979 await self.repo_add(
980 cluster_uuid, db_repo["name"], db_repo["url"]
981 )
982 added_repo_dict[repo_id] = db_repo["name"]
983 except Exception as e:
984 raise K8sException(
985 "Error adding repo id: {}, err_msg: {} ".format(
986 repo_id, repr(e)
987 )
988 )
989
990 # Delete repos that are present but not in nbi_list
991 for repo_name in local_repo_dict:
992 if not db_repo_dict.get(repo_name) and repo_name != "stable":
993 self.log.debug("delete repo {}".format(repo_name))
994 try:
995 await self.repo_remove(cluster_uuid, repo_name)
996 deleted_repo_list.append(repo_name)
997 except Exception as e:
998 self.warning(
999 "Error deleting repo, name: {}, err_msg: {}".format(
1000 repo_name, str(e)
1001 )
1002 )
1003
1004 return deleted_repo_list, added_repo_dict
1005
1006 except K8sException:
1007 raise
1008 except Exception as e:
1009 # Do not raise errors synchronizing repos
1010 self.log.error("Error synchronizing repos: {}".format(e))
1011 raise Exception("Error synchronizing repos: {}".format(e))
1012
1013 def _get_db_repos_dict(self, repo_ids: list):
1014 db_repos_dict = {}
1015 for repo_id in repo_ids:
1016 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
1017 db_repos_dict[db_repo["name"]] = db_repo
1018 return db_repos_dict
1019
1020 """
1021 ####################################################################################
1022 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1023 ####################################################################################
1024 """
1025
1026 @abc.abstractmethod
1027 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
1028 """
1029 Creates and returns base cluster and kube dirs and returns them.
1030 Also created helm3 dirs according to new directory specification, paths are
1031 not returned but assigned to helm environment variables
1032
1033 :param cluster_name: cluster_name
1034 :return: Dictionary with config_paths and dictionary with helm environment variables
1035 """
1036
1037 @abc.abstractmethod
1038 async def _cluster_init(self, cluster_id, namespace, paths, env):
1039 """
1040 Implements the helm version dependent cluster initialization
1041 """
1042
1043 @abc.abstractmethod
1044 async def _instances_list(self, cluster_id):
1045 """
1046 Implements the helm version dependent helm instances list
1047 """
1048
1049 @abc.abstractmethod
1050 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1051 """
1052 Implements the helm version dependent method to obtain services from a helm instance
1053 """
1054
1055 @abc.abstractmethod
1056 async def _status_kdu(
1057 self,
1058 cluster_id: str,
1059 kdu_instance: str,
1060 namespace: str = None,
1061 yaml_format: bool = False,
1062 show_error_log: bool = False,
1063 ) -> Union[str, dict]:
1064 """
1065 Implements the helm version dependent method to obtain status of a helm instance
1066 """
1067
1068 @abc.abstractmethod
1069 def _get_install_command(
1070 self,
1071 kdu_model,
1072 kdu_instance,
1073 namespace,
1074 params_str,
1075 version,
1076 atomic,
1077 timeout,
1078 kubeconfig,
1079 ) -> str:
1080 """
1081 Obtain command to be executed to delete the indicated instance
1082 """
1083
1084 @abc.abstractmethod
1085 def _get_upgrade_command(
1086 self,
1087 kdu_model,
1088 kdu_instance,
1089 namespace,
1090 params_str,
1091 version,
1092 atomic,
1093 timeout,
1094 kubeconfig,
1095 ) -> str:
1096 """
1097 Obtain command to be executed to upgrade the indicated instance
1098 """
1099
1100 @abc.abstractmethod
1101 def _get_rollback_command(
1102 self, kdu_instance, namespace, revision, kubeconfig
1103 ) -> str:
1104 """
1105 Obtain command to be executed to rollback the indicated instance
1106 """
1107
1108 @abc.abstractmethod
1109 def _get_uninstall_command(
1110 self, kdu_instance: str, namespace: str, kubeconfig: str
1111 ) -> str:
1112 """
1113 Obtain command to be executed to delete the indicated instance
1114 """
1115
1116 @abc.abstractmethod
1117 def _get_inspect_command(
1118 self, show_command: str, kdu_model: str, repo_str: str, version: str
1119 ):
1120 """
1121 Obtain command to be executed to obtain information about the kdu
1122 """
1123
1124 @abc.abstractmethod
1125 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1126 """
1127 Method call to uninstall cluster software for helm. This method is dependent
1128 of helm version
1129 For Helm v2 it will be called when Tiller must be uninstalled
1130 For Helm v3 it does nothing and does not need to be callled
1131 """
1132
1133 @abc.abstractmethod
1134 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1135 """
1136 Obtains the cluster repos identifiers
1137 """
1138
1139 """
1140 ####################################################################################
1141 ################################### P R I V A T E ##################################
1142 ####################################################################################
1143 """
1144
1145 @staticmethod
1146 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1147 if os.path.exists(filename):
1148 return True
1149 else:
1150 msg = "File {} does not exist".format(filename)
1151 if exception_if_not_exists:
1152 raise K8sException(msg)
1153
1154 @staticmethod
1155 def _remove_multiple_spaces(strobj):
1156 strobj = strobj.strip()
1157 while " " in strobj:
1158 strobj = strobj.replace(" ", " ")
1159 return strobj
1160
1161 @staticmethod
1162 def _output_to_lines(output: str) -> list:
1163 output_lines = list()
1164 lines = output.splitlines(keepends=False)
1165 for line in lines:
1166 line = line.strip()
1167 if len(line) > 0:
1168 output_lines.append(line)
1169 return output_lines
1170
1171 @staticmethod
1172 def _output_to_table(output: str) -> list:
1173 output_table = list()
1174 lines = output.splitlines(keepends=False)
1175 for line in lines:
1176 line = line.replace("\t", " ")
1177 line_list = list()
1178 output_table.append(line_list)
1179 cells = line.split(sep=" ")
1180 for cell in cells:
1181 cell = cell.strip()
1182 if len(cell) > 0:
1183 line_list.append(cell)
1184 return output_table
1185
1186 @staticmethod
1187 def _parse_services(output: str) -> list:
1188 lines = output.splitlines(keepends=False)
1189 services = []
1190 for line in lines:
1191 line = line.replace("\t", " ")
1192 cells = line.split(sep=" ")
1193 if len(cells) > 0 and cells[0].startswith("service/"):
1194 elems = cells[0].split(sep="/")
1195 if len(elems) > 1:
1196 services.append(elems[1])
1197 return services
1198
1199 @staticmethod
1200 def _get_deep(dictionary: dict, members: tuple):
1201 target = dictionary
1202 value = None
1203 try:
1204 for m in members:
1205 value = target.get(m)
1206 if not value:
1207 return None
1208 else:
1209 target = value
1210 except Exception:
1211 pass
1212 return value
1213
1214 # find key:value in several lines
1215 @staticmethod
1216 def _find_in_lines(p_lines: list, p_key: str) -> str:
1217 for line in p_lines:
1218 try:
1219 if line.startswith(p_key + ":"):
1220 parts = line.split(":")
1221 the_value = parts[1].strip()
1222 return the_value
1223 except Exception:
1224 # ignore it
1225 pass
1226 return None
1227
1228 @staticmethod
1229 def _lower_keys_list(input_list: list):
1230 """
1231 Transform the keys in a list of dictionaries to lower case and returns a new list
1232 of dictionaries
1233 """
1234 new_list = []
1235 if input_list:
1236 for dictionary in input_list:
1237 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1238 new_list.append(new_dict)
1239 return new_list
1240
1241 async def _local_async_exec(
1242 self,
1243 command: str,
1244 raise_exception_on_error: bool = False,
1245 show_error_log: bool = True,
1246 encode_utf8: bool = False,
1247 env: dict = None,
1248 ) -> (str, int):
1249
1250 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1251 self.log.debug(
1252 "Executing async local command: {}, env: {}".format(command, env)
1253 )
1254
1255 # split command
1256 command = shlex.split(command)
1257
1258 environ = os.environ.copy()
1259 if env:
1260 environ.update(env)
1261
1262 try:
1263 process = await asyncio.create_subprocess_exec(
1264 *command,
1265 stdout=asyncio.subprocess.PIPE,
1266 stderr=asyncio.subprocess.PIPE,
1267 env=environ,
1268 )
1269
1270 # wait for command terminate
1271 stdout, stderr = await process.communicate()
1272
1273 return_code = process.returncode
1274
1275 output = ""
1276 if stdout:
1277 output = stdout.decode("utf-8").strip()
1278 # output = stdout.decode()
1279 if stderr:
1280 output = stderr.decode("utf-8").strip()
1281 # output = stderr.decode()
1282
1283 if return_code != 0 and show_error_log:
1284 self.log.debug(
1285 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1286 )
1287 else:
1288 self.log.debug("Return code: {}".format(return_code))
1289
1290 if raise_exception_on_error and return_code != 0:
1291 raise K8sException(output)
1292
1293 if encode_utf8:
1294 output = output.encode("utf-8").strip()
1295 output = str(output).replace("\\n", "\n")
1296
1297 return output, return_code
1298
1299 except asyncio.CancelledError:
1300 raise
1301 except K8sException:
1302 raise
1303 except Exception as e:
1304 msg = "Exception executing command: {} -> {}".format(command, e)
1305 self.log.error(msg)
1306 if raise_exception_on_error:
1307 raise K8sException(e) from e
1308 else:
1309 return "", -1
1310
1311 async def _local_async_exec_pipe(
1312 self,
1313 command1: str,
1314 command2: str,
1315 raise_exception_on_error: bool = True,
1316 show_error_log: bool = True,
1317 encode_utf8: bool = False,
1318 env: dict = None,
1319 ):
1320
1321 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1322 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1323 command = "{} | {}".format(command1, command2)
1324 self.log.debug(
1325 "Executing async local command: {}, env: {}".format(command, env)
1326 )
1327
1328 # split command
1329 command1 = shlex.split(command1)
1330 command2 = shlex.split(command2)
1331
1332 environ = os.environ.copy()
1333 if env:
1334 environ.update(env)
1335
1336 try:
1337 read, write = os.pipe()
1338 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1339 os.close(write)
1340 process_2 = await asyncio.create_subprocess_exec(
1341 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1342 )
1343 os.close(read)
1344 stdout, stderr = await process_2.communicate()
1345
1346 return_code = process_2.returncode
1347
1348 output = ""
1349 if stdout:
1350 output = stdout.decode("utf-8").strip()
1351 # output = stdout.decode()
1352 if stderr:
1353 output = stderr.decode("utf-8").strip()
1354 # output = stderr.decode()
1355
1356 if return_code != 0 and show_error_log:
1357 self.log.debug(
1358 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1359 )
1360 else:
1361 self.log.debug("Return code: {}".format(return_code))
1362
1363 if raise_exception_on_error and return_code != 0:
1364 raise K8sException(output)
1365
1366 if encode_utf8:
1367 output = output.encode("utf-8").strip()
1368 output = str(output).replace("\\n", "\n")
1369
1370 return output, return_code
1371 except asyncio.CancelledError:
1372 raise
1373 except K8sException:
1374 raise
1375 except Exception as e:
1376 msg = "Exception executing command: {} -> {}".format(command, e)
1377 self.log.error(msg)
1378 if raise_exception_on_error:
1379 raise K8sException(e) from e
1380 else:
1381 return "", -1
1382
1383 async def _get_service(self, cluster_id, service_name, namespace):
1384 """
1385 Obtains the data of the specified service in the k8cluster.
1386
1387 :param cluster_id: id of a K8s cluster known by OSM
1388 :param service_name: name of the K8s service in the specified namespace
1389 :param namespace: K8s namespace used by the KDU instance
1390 :return: If successful, it will return a service with the following data:
1391 - `name` of the service
1392 - `type` type of service in the k8 cluster
1393 - `ports` List of ports offered by the service, for each port includes at least
1394 name, port, protocol
1395 - `cluster_ip` Internal ip to be used inside k8s cluster
1396 - `external_ip` List of external ips (in case they are available)
1397 """
1398
1399 # init config, env
1400 paths, env = self._init_paths_env(
1401 cluster_name=cluster_id, create_if_not_exist=True
1402 )
1403
1404 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1405 self.kubectl_command, paths["kube_config"], namespace, service_name
1406 )
1407
1408 output, _rc = await self._local_async_exec(
1409 command=command, raise_exception_on_error=True, env=env
1410 )
1411
1412 data = yaml.load(output, Loader=yaml.SafeLoader)
1413
1414 service = {
1415 "name": service_name,
1416 "type": self._get_deep(data, ("spec", "type")),
1417 "ports": self._get_deep(data, ("spec", "ports")),
1418 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1419 }
1420 if service["type"] == "LoadBalancer":
1421 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1422 ip_list = [elem["ip"] for elem in ip_map_list]
1423 service["external_ip"] = ip_list
1424
1425 return service
1426
1427 async def _exec_inspect_comand(
1428 self, inspect_command: str, kdu_model: str, repo_url: str = None
1429 ):
1430 """
1431 Obtains information about a kdu, no cluster (no env)
1432 """
1433
1434 repo_str = ""
1435 if repo_url:
1436 repo_str = " --repo {}".format(repo_url)
1437
1438 idx = kdu_model.find("/")
1439 if idx >= 0:
1440 idx += 1
1441 kdu_model = kdu_model[idx:]
1442
1443 version = ""
1444 if ":" in kdu_model:
1445 parts = kdu_model.split(sep=":")
1446 if len(parts) == 2:
1447 version = "--version {}".format(str(parts[1]))
1448 kdu_model = parts[0]
1449
1450 full_command = self._get_inspect_command(
1451 inspect_command, kdu_model, repo_str, version
1452 )
1453 output, _rc = await self._local_async_exec(
1454 command=full_command, encode_utf8=True
1455 )
1456
1457 return output
1458
1459 async def _store_status(
1460 self,
1461 cluster_id: str,
1462 operation: str,
1463 kdu_instance: str,
1464 namespace: str = None,
1465 check_every: float = 10,
1466 db_dict: dict = None,
1467 run_once: bool = False,
1468 ):
1469 while True:
1470 try:
1471 await asyncio.sleep(check_every)
1472 detailed_status = await self._status_kdu(
1473 cluster_id=cluster_id,
1474 kdu_instance=kdu_instance,
1475 yaml_format=False,
1476 namespace=namespace,
1477 )
1478 status = detailed_status.get("info").get("description")
1479 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1480 # write status to db
1481 result = await self.write_app_status_to_db(
1482 db_dict=db_dict,
1483 status=str(status),
1484 detailed_status=str(detailed_status),
1485 operation=operation,
1486 )
1487 if not result:
1488 self.log.info("Error writing in database. Task exiting...")
1489 return
1490 except asyncio.CancelledError:
1491 self.log.debug("Task cancelled")
1492 return
1493 except Exception as e:
1494 self.log.debug(
1495 "_store_status exception: {}".format(str(e)), exc_info=True
1496 )
1497 pass
1498 finally:
1499 if run_once:
1500 return
1501
1502 # params for use in -f file
1503 # returns values file option and filename (in order to delete it at the end)
1504 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1505
1506 if params and len(params) > 0:
1507 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1508
1509 def get_random_number():
1510 r = random.randrange(start=1, stop=99999999)
1511 s = str(r)
1512 while len(s) < 10:
1513 s = "0" + s
1514 return s
1515
1516 params2 = dict()
1517 for key in params:
1518 value = params.get(key)
1519 if "!!yaml" in str(value):
1520 value = yaml.load(value[7:])
1521 params2[key] = value
1522
1523 values_file = get_random_number() + ".yaml"
1524 with open(values_file, "w") as stream:
1525 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1526
1527 return "-f {}".format(values_file), values_file
1528
1529 return "", None
1530
1531 # params for use in --set option
1532 @staticmethod
1533 def _params_to_set_option(params: dict) -> str:
1534 params_str = ""
1535 if params and len(params) > 0:
1536 start = True
1537 for key in params:
1538 value = params.get(key, None)
1539 if value is not None:
1540 if start:
1541 params_str += "--set "
1542 start = False
1543 else:
1544 params_str += ","
1545 params_str += "{}={}".format(key, value)
1546 return params_str
1547
1548 @staticmethod
1549 def generate_kdu_instance_name(**kwargs):
1550 chart_name = kwargs["kdu_model"]
1551 # check embeded chart (file or dir)
1552 if chart_name.startswith("/"):
1553 # extract file or directory name
1554 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1555 # check URL
1556 elif "://" in chart_name:
1557 # extract last portion of URL
1558 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1559
1560 name = ""
1561 for c in chart_name:
1562 if c.isalpha() or c.isnumeric():
1563 name += c
1564 else:
1565 name += "-"
1566 if len(name) > 35:
1567 name = name[0:35]
1568
1569 # if does not start with alpha character, prefix 'a'
1570 if not name[0].isalpha():
1571 name = "a" + name
1572
1573 name += "-"
1574
1575 def get_random_number():
1576 r = random.randrange(start=1, stop=99999999)
1577 s = str(r)
1578 s = s.rjust(10, "0")
1579 return s
1580
1581 name = name + get_random_number()
1582 return name.lower()
1583
1584 async def _split_repo(self, kdu_model: str) -> str:
1585 repo_name = None
1586 idx = kdu_model.find("/")
1587 if idx >= 0:
1588 repo_name = kdu_model[:idx]
1589 return repo_name