Bug 1964 fixed: removed the variable cluster_uuid from init_env method
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 def _get_namespace(self, cluster_uuid: str) -> str:
93 """
94 Obtains the namespace used by the cluster with the uuid passed by argument
95
96 param: cluster_uuid: cluster's uuid
97 """
98
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster = self.db.get_one(
101 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
102 )
103 return k8scluster.get("namespace")
104
105 async def init_env(
106 self,
107 k8s_creds: str,
108 namespace: str = "kube-system",
109 reuse_cluster_uuid=None,
110 **kwargs,
111 ) -> (str, bool):
112 """
113 It prepares a given K8s cluster environment to run Charts
114
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
116 '.kube/config'
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
124 """
125
126 if reuse_cluster_uuid:
127 cluster_id = reuse_cluster_uuid
128 else:
129 cluster_id = str(uuid4())
130
131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
151 return cluster_id, n2vc_installed_sw
152
153 async def repo_add(
154 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
155 ):
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_uuid, repo_type, name, url
159 )
160 )
161
162 # init_env
163 paths, env = self._init_paths_env(
164 cluster_name=cluster_uuid, create_if_not_exist=True
165 )
166
167 # sync local dir
168 self.fs.sync(from_path=cluster_uuid)
169
170 # helm repo update
171 command = "env KUBECONFIG={} {} repo update".format(
172 paths["kube_config"], self._helm_command
173 )
174 self.log.debug("updating repo: {}".format(command))
175 await self._local_async_exec(
176 command=command, raise_exception_on_error=False, env=env
177 )
178
179 # helm repo add name url
180 command = "env KUBECONFIG={} {} repo add {} {}".format(
181 paths["kube_config"], self._helm_command, name, url
182 )
183 self.log.debug("adding repo: {}".format(command))
184 await self._local_async_exec(
185 command=command, raise_exception_on_error=True, env=env
186 )
187
188 # sync fs
189 self.fs.reverse_sync(from_path=cluster_uuid)
190
191 async def repo_list(self, cluster_uuid: str) -> list:
192 """
193 Get the list of registered repositories
194
195 :return: list of registered repositories: [ (name, url) .... ]
196 """
197
198 self.log.debug("list repositories for cluster {}".format(cluster_uuid))
199
200 # config filename
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_uuid, create_if_not_exist=True
203 )
204
205 # sync local dir
206 self.fs.sync(from_path=cluster_uuid)
207
208 command = "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths["kube_config"], self._helm_command
210 )
211
212 # Set exception to false because if there are no repos just want an empty list
213 output, _rc = await self._local_async_exec(
214 command=command, raise_exception_on_error=False, env=env
215 )
216
217 # sync fs
218 self.fs.reverse_sync(from_path=cluster_uuid)
219
220 if _rc == 0:
221 if output and len(output) > 0:
222 repos = yaml.load(output, Loader=yaml.SafeLoader)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self._lower_keys_list(repos)
225 else:
226 return []
227 else:
228 return []
229
230 async def repo_remove(self, cluster_uuid: str, name: str):
231 self.log.debug(
232 "remove {} repositories for cluster {}".format(name, cluster_uuid)
233 )
234
235 # init env, paths
236 paths, env = self._init_paths_env(
237 cluster_name=cluster_uuid, create_if_not_exist=True
238 )
239
240 # sync local dir
241 self.fs.sync(from_path=cluster_uuid)
242
243 command = "env KUBECONFIG={} {} repo remove {}".format(
244 paths["kube_config"], self._helm_command, name
245 )
246 await self._local_async_exec(
247 command=command, raise_exception_on_error=True, env=env
248 )
249
250 # sync fs
251 self.fs.reverse_sync(from_path=cluster_uuid)
252
253 async def reset(
254 self,
255 cluster_uuid: str,
256 force: bool = False,
257 uninstall_sw: bool = False,
258 **kwargs,
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
263
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
269 """
270 namespace = self._get_namespace(cluster_uuid=cluster_uuid)
271 self.log.debug(
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_uuid, uninstall_sw
274 )
275 )
276
277 # sync local dir
278 self.fs.sync(from_path=cluster_uuid)
279
280 # uninstall releases if needed.
281 if uninstall_sw:
282 releases = await self.instances_list(cluster_uuid=cluster_uuid)
283 if len(releases) > 0:
284 if force:
285 for r in releases:
286 try:
287 kdu_instance = r.get("name")
288 chart = r.get("chart")
289 self.log.debug(
290 "Uninstalling {} -> {}".format(chart, kdu_instance)
291 )
292 await self.uninstall(
293 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
294 )
295 except Exception as e:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
298 # raised an error
299 self.log.warn(
300 "Error uninstalling release {}: {}".format(
301 kdu_instance, e
302 )
303 )
304 else:
305 msg = (
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_uuid)
308 self.log.warn(msg)
309 uninstall_sw = (
310 False # Allow to remove k8s cluster without removing Tiller
311 )
312
313 if uninstall_sw:
314 await self._uninstall_sw(cluster_id=cluster_uuid, namespace=namespace)
315
316 # delete cluster directory
317 self.log.debug("Removing directory {}".format(cluster_uuid))
318 self.fs.file_delete(cluster_uuid, ignore_non_exist=True)
319 # Remove also local directorio if still exist
320 direct = self.fs.path + "/" + cluster_uuid
321 shutil.rmtree(direct, ignore_errors=True)
322
323 return True
324
325 async def _install_impl(
326 self,
327 cluster_id: str,
328 kdu_model: str,
329 paths: dict,
330 env: dict,
331 kdu_instance: str,
332 atomic: bool = True,
333 timeout: float = 300,
334 params: dict = None,
335 db_dict: dict = None,
336 kdu_name: str = None,
337 namespace: str = None,
338 ):
339 # init env, paths
340 paths, env = self._init_paths_env(
341 cluster_name=cluster_id, create_if_not_exist=True
342 )
343
344 # params to str
345 params_str, file_to_delete = self._params_to_file_option(
346 cluster_id=cluster_id, params=params
347 )
348
349 # version
350 version = None
351 if ":" in kdu_model:
352 parts = kdu_model.split(sep=":")
353 if len(parts) == 2:
354 version = str(parts[1])
355 kdu_model = parts[0]
356
357 command = self._get_install_command(
358 kdu_model,
359 kdu_instance,
360 namespace,
361 params_str,
362 version,
363 atomic,
364 timeout,
365 paths["kube_config"],
366 )
367
368 self.log.debug("installing: {}".format(command))
369
370 if atomic:
371 # exec helm in a task
372 exec_task = asyncio.ensure_future(
373 coro_or_future=self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376 )
377
378 # write status in another task
379 status_task = asyncio.ensure_future(
380 coro_or_future=self._store_status(
381 cluster_id=cluster_id,
382 kdu_instance=kdu_instance,
383 namespace=namespace,
384 db_dict=db_dict,
385 operation="install",
386 run_once=False,
387 )
388 )
389
390 # wait for execution task
391 await asyncio.wait([exec_task])
392
393 # cancel status task
394 status_task.cancel()
395
396 output, rc = exec_task.result()
397
398 else:
399
400 output, rc = await self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403
404 # remove temporal values yaml file
405 if file_to_delete:
406 os.remove(file_to_delete)
407
408 # write final status
409 await self._store_status(
410 cluster_id=cluster_id,
411 kdu_instance=kdu_instance,
412 namespace=namespace,
413 db_dict=db_dict,
414 operation="install",
415 run_once=True,
416 check_every=0,
417 )
418
419 if rc != 0:
420 msg = "Error executing command: {}\nOutput: {}".format(command, output)
421 self.log.error(msg)
422 raise K8sException(msg)
423
424 async def upgrade(
425 self,
426 cluster_uuid: str,
427 kdu_instance: str,
428 kdu_model: str = None,
429 atomic: bool = True,
430 timeout: float = 300,
431 params: dict = None,
432 db_dict: dict = None,
433 ):
434 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_uuid))
435
436 # sync local dir
437 self.fs.sync(from_path=cluster_uuid)
438
439 # look for instance to obtain namespace
440 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
441 if not instance_info:
442 raise K8sException("kdu_instance {} not found".format(kdu_instance))
443
444 # init env, paths
445 paths, env = self._init_paths_env(
446 cluster_name=cluster_uuid, create_if_not_exist=True
447 )
448
449 # sync local dir
450 self.fs.sync(from_path=cluster_uuid)
451
452 # params to str
453 params_str, file_to_delete = self._params_to_file_option(
454 cluster_id=cluster_uuid, params=params
455 )
456
457 # version
458 version = None
459 if ":" in kdu_model:
460 parts = kdu_model.split(sep=":")
461 if len(parts) == 2:
462 version = str(parts[1])
463 kdu_model = parts[0]
464
465 command = self._get_upgrade_command(
466 kdu_model,
467 kdu_instance,
468 instance_info["namespace"],
469 params_str,
470 version,
471 atomic,
472 timeout,
473 paths["kube_config"],
474 )
475
476 self.log.debug("upgrading: {}".format(command))
477
478 if atomic:
479
480 # exec helm in a task
481 exec_task = asyncio.ensure_future(
482 coro_or_future=self._local_async_exec(
483 command=command, raise_exception_on_error=False, env=env
484 )
485 )
486 # write status in another task
487 status_task = asyncio.ensure_future(
488 coro_or_future=self._store_status(
489 cluster_id=cluster_uuid,
490 kdu_instance=kdu_instance,
491 namespace=instance_info["namespace"],
492 db_dict=db_dict,
493 operation="upgrade",
494 run_once=False,
495 )
496 )
497
498 # wait for execution task
499 await asyncio.wait([exec_task])
500
501 # cancel status task
502 status_task.cancel()
503 output, rc = exec_task.result()
504
505 else:
506
507 output, rc = await self._local_async_exec(
508 command=command, raise_exception_on_error=False, env=env
509 )
510
511 # remove temporal values yaml file
512 if file_to_delete:
513 os.remove(file_to_delete)
514
515 # write final status
516 await self._store_status(
517 cluster_id=cluster_uuid,
518 kdu_instance=kdu_instance,
519 namespace=instance_info["namespace"],
520 db_dict=db_dict,
521 operation="upgrade",
522 run_once=True,
523 check_every=0,
524 )
525
526 if rc != 0:
527 msg = "Error executing command: {}\nOutput: {}".format(command, output)
528 self.log.error(msg)
529 raise K8sException(msg)
530
531 # sync fs
532 self.fs.reverse_sync(from_path=cluster_uuid)
533
534 # return new revision number
535 instance = await self.get_instance_info(
536 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
537 )
538 if instance:
539 revision = int(instance.get("revision"))
540 self.log.debug("New revision: {}".format(revision))
541 return revision
542 else:
543 return 0
544
545 async def scale(
546 self,
547 kdu_instance: str,
548 scale: int,
549 resource_name: str,
550 total_timeout: float = 1800,
551 **kwargs,
552 ):
553 raise NotImplementedError("Method not implemented")
554
555 async def get_scale_count(
556 self,
557 resource_name: str,
558 kdu_instance: str,
559 **kwargs,
560 ):
561 raise NotImplementedError("Method not implemented")
562
563 async def rollback(
564 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
565 ):
566 self.log.debug(
567 "rollback kdu_instance {} to revision {} from cluster {}".format(
568 kdu_instance, revision, cluster_uuid
569 )
570 )
571
572 # sync local dir
573 self.fs.sync(from_path=cluster_uuid)
574
575 # look for instance to obtain namespace
576 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
577 if not instance_info:
578 raise K8sException("kdu_instance {} not found".format(kdu_instance))
579
580 # init env, paths
581 paths, env = self._init_paths_env(
582 cluster_name=cluster_uuid, create_if_not_exist=True
583 )
584
585 # sync local dir
586 self.fs.sync(from_path=cluster_uuid)
587
588 command = self._get_rollback_command(
589 kdu_instance, instance_info["namespace"], revision, paths["kube_config"]
590 )
591
592 self.log.debug("rolling_back: {}".format(command))
593
594 # exec helm in a task
595 exec_task = asyncio.ensure_future(
596 coro_or_future=self._local_async_exec(
597 command=command, raise_exception_on_error=False, env=env
598 )
599 )
600 # write status in another task
601 status_task = asyncio.ensure_future(
602 coro_or_future=self._store_status(
603 cluster_id=cluster_uuid,
604 kdu_instance=kdu_instance,
605 namespace=instance_info["namespace"],
606 db_dict=db_dict,
607 operation="rollback",
608 run_once=False,
609 )
610 )
611
612 # wait for execution task
613 await asyncio.wait([exec_task])
614
615 # cancel status task
616 status_task.cancel()
617
618 output, rc = exec_task.result()
619
620 # write final status
621 await self._store_status(
622 cluster_id=cluster_uuid,
623 kdu_instance=kdu_instance,
624 namespace=instance_info["namespace"],
625 db_dict=db_dict,
626 operation="rollback",
627 run_once=True,
628 check_every=0,
629 )
630
631 if rc != 0:
632 msg = "Error executing command: {}\nOutput: {}".format(command, output)
633 self.log.error(msg)
634 raise K8sException(msg)
635
636 # sync fs
637 self.fs.reverse_sync(from_path=cluster_uuid)
638
639 # return new revision number
640 instance = await self.get_instance_info(
641 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
642 )
643 if instance:
644 revision = int(instance.get("revision"))
645 self.log.debug("New revision: {}".format(revision))
646 return revision
647 else:
648 return 0
649
650 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
651 """
652 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
653 (this call should happen after all _terminate-config-primitive_ of the VNF
654 are invoked).
655
656 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
657 :param kdu_instance: unique name for the KDU instance to be deleted
658 :param kwargs: Additional parameters (None yet)
659 :return: True if successful
660 """
661
662 self.log.debug(
663 "uninstall kdu_instance {} from cluster {}".format(
664 kdu_instance, cluster_uuid
665 )
666 )
667
668 # sync local dir
669 self.fs.sync(from_path=cluster_uuid)
670
671 # look for instance to obtain namespace
672 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
673 if not instance_info:
674 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
675 return True
676 # init env, paths
677 paths, env = self._init_paths_env(
678 cluster_name=cluster_uuid, create_if_not_exist=True
679 )
680
681 # sync local dir
682 self.fs.sync(from_path=cluster_uuid)
683
684 command = self._get_uninstall_command(
685 kdu_instance, instance_info["namespace"], paths["kube_config"]
686 )
687 output, _rc = await self._local_async_exec(
688 command=command, raise_exception_on_error=True, env=env
689 )
690
691 # sync fs
692 self.fs.reverse_sync(from_path=cluster_uuid)
693
694 return self._output_to_table(output)
695
696 async def instances_list(self, cluster_uuid: str) -> list:
697 """
698 returns a list of deployed releases in a cluster
699
700 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
701 :return:
702 """
703
704 self.log.debug("list releases for cluster {}".format(cluster_uuid))
705
706 # sync local dir
707 self.fs.sync(from_path=cluster_uuid)
708
709 # execute internal command
710 result = await self._instances_list(cluster_uuid)
711
712 # sync fs
713 self.fs.reverse_sync(from_path=cluster_uuid)
714
715 return result
716
717 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
718 instances = await self.instances_list(cluster_uuid=cluster_uuid)
719 for instance in instances:
720 if instance.get("name") == kdu_instance:
721 return instance
722 self.log.debug("Instance {} not found".format(kdu_instance))
723 return None
724
725 async def exec_primitive(
726 self,
727 cluster_uuid: str = None,
728 kdu_instance: str = None,
729 primitive_name: str = None,
730 timeout: float = 300,
731 params: dict = None,
732 db_dict: dict = None,
733 **kwargs,
734 ) -> str:
735 """Exec primitive (Juju action)
736
737 :param cluster_uuid: The UUID of the cluster or namespace:cluster
738 :param kdu_instance: The unique name of the KDU instance
739 :param primitive_name: Name of action that will be executed
740 :param timeout: Timeout for action execution
741 :param params: Dictionary of all the parameters needed for the action
742 :db_dict: Dictionary for any additional data
743 :param kwargs: Additional parameters (None yet)
744
745 :return: Returns the output of the action
746 """
747 raise K8sException(
748 "KDUs deployed with Helm don't support actions "
749 "different from rollback, upgrade and status"
750 )
751
752 async def get_services(
753 self, cluster_uuid: str, kdu_instance: str, namespace: str
754 ) -> list:
755 """
756 Returns a list of services defined for the specified kdu instance.
757
758 :param cluster_uuid: UUID of a K8s cluster known by OSM
759 :param kdu_instance: unique name for the KDU instance
760 :param namespace: K8s namespace used by the KDU instance
761 :return: If successful, it will return a list of services, Each service
762 can have the following data:
763 - `name` of the service
764 - `type` type of service in the k8 cluster
765 - `ports` List of ports offered by the service, for each port includes at least
766 name, port, protocol
767 - `cluster_ip` Internal ip to be used inside k8s cluster
768 - `external_ip` List of external ips (in case they are available)
769 """
770
771 self.log.debug(
772 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
773 cluster_uuid, kdu_instance
774 )
775 )
776
777 # init env, paths
778 paths, env = self._init_paths_env(
779 cluster_name=cluster_uuid, create_if_not_exist=True
780 )
781
782 # sync local dir
783 self.fs.sync(from_path=cluster_uuid)
784
785 # get list of services names for kdu
786 service_names = await self._get_services(
787 cluster_uuid, kdu_instance, namespace, paths["kube_config"]
788 )
789
790 service_list = []
791 for service in service_names:
792 service = await self._get_service(cluster_uuid, service, namespace)
793 service_list.append(service)
794
795 # sync fs
796 self.fs.reverse_sync(from_path=cluster_uuid)
797
798 return service_list
799
800 async def get_service(
801 self, cluster_uuid: str, service_name: str, namespace: str
802 ) -> object:
803
804 self.log.debug(
805 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
806 service_name, namespace, cluster_uuid
807 )
808 )
809
810 # sync local dir
811 self.fs.sync(from_path=cluster_uuid)
812
813 service = await self._get_service(cluster_uuid, service_name, namespace)
814
815 # sync fs
816 self.fs.reverse_sync(from_path=cluster_uuid)
817
818 return service
819
820 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
821 """
822 This call would retrieve tha current state of a given KDU instance. It would be
823 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
824 values_ of the configuration parameters applied to a given instance. This call
825 would be based on the `status` call.
826
827 :param cluster_uuid: UUID of a K8s cluster known by OSM
828 :param kdu_instance: unique name for the KDU instance
829 :param kwargs: Additional parameters (None yet)
830 :return: If successful, it will return the following vector of arguments:
831 - K8s `namespace` in the cluster where the KDU lives
832 - `state` of the KDU instance. It can be:
833 - UNKNOWN
834 - DEPLOYED
835 - DELETED
836 - SUPERSEDED
837 - FAILED or
838 - DELETING
839 - List of `resources` (objects) that this release consists of, sorted by kind,
840 and the status of those resources
841 - Last `deployment_time`.
842
843 """
844 self.log.debug(
845 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
846 cluster_uuid, kdu_instance
847 )
848 )
849
850 # sync local dir
851 self.fs.sync(from_path=cluster_uuid)
852
853 # get instance: needed to obtain namespace
854 instances = await self._instances_list(cluster_id=cluster_uuid)
855 for instance in instances:
856 if instance.get("name") == kdu_instance:
857 break
858 else:
859 # instance does not exist
860 raise K8sException(
861 "Instance name: {} not found in cluster: {}".format(
862 kdu_instance, cluster_uuid
863 )
864 )
865
866 status = await self._status_kdu(
867 cluster_id=cluster_uuid,
868 kdu_instance=kdu_instance,
869 namespace=instance["namespace"],
870 show_error_log=True,
871 return_text=True,
872 )
873
874 # sync fs
875 self.fs.reverse_sync(from_path=cluster_uuid)
876
877 return status
878
879 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
880
881 self.log.debug(
882 "inspect kdu_model values {} from (optional) repo: {}".format(
883 kdu_model, repo_url
884 )
885 )
886
887 return await self._exec_inspect_comand(
888 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
889 )
890
891 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
892
893 self.log.debug(
894 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
895 )
896
897 return await self._exec_inspect_comand(
898 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
899 )
900
901 async def synchronize_repos(self, cluster_uuid: str):
902
903 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
904 try:
905 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
906 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
907
908 local_repo_list = await self.repo_list(cluster_uuid)
909 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
910
911 deleted_repo_list = []
912 added_repo_dict = {}
913
914 # iterate over the list of repos in the database that should be
915 # added if not present
916 for repo_name, db_repo in db_repo_dict.items():
917 try:
918 # check if it is already present
919 curr_repo_url = local_repo_dict.get(db_repo["name"])
920 repo_id = db_repo.get("_id")
921 if curr_repo_url != db_repo["url"]:
922 if curr_repo_url:
923 self.log.debug(
924 "repo {} url changed, delete and and again".format(
925 db_repo["url"]
926 )
927 )
928 await self.repo_remove(cluster_uuid, db_repo["name"])
929 deleted_repo_list.append(repo_id)
930
931 # add repo
932 self.log.debug("add repo {}".format(db_repo["name"]))
933 await self.repo_add(
934 cluster_uuid, db_repo["name"], db_repo["url"]
935 )
936 added_repo_dict[repo_id] = db_repo["name"]
937 except Exception as e:
938 raise K8sException(
939 "Error adding repo id: {}, err_msg: {} ".format(
940 repo_id, repr(e)
941 )
942 )
943
944 # Delete repos that are present but not in nbi_list
945 for repo_name in local_repo_dict:
946 if not db_repo_dict.get(repo_name) and repo_name != "stable":
947 self.log.debug("delete repo {}".format(repo_name))
948 try:
949 await self.repo_remove(cluster_uuid, repo_name)
950 deleted_repo_list.append(repo_name)
951 except Exception as e:
952 self.warning(
953 "Error deleting repo, name: {}, err_msg: {}".format(
954 repo_name, str(e)
955 )
956 )
957
958 return deleted_repo_list, added_repo_dict
959
960 except K8sException:
961 raise
962 except Exception as e:
963 # Do not raise errors synchronizing repos
964 self.log.error("Error synchronizing repos: {}".format(e))
965 raise Exception("Error synchronizing repos: {}".format(e))
966
967 def _get_db_repos_dict(self, repo_ids: list):
968 db_repos_dict = {}
969 for repo_id in repo_ids:
970 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
971 db_repos_dict[db_repo["name"]] = db_repo
972 return db_repos_dict
973
974 """
975 ####################################################################################
976 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
977 ####################################################################################
978 """
979
980 @abc.abstractmethod
981 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
982 """
983 Creates and returns base cluster and kube dirs and returns them.
984 Also created helm3 dirs according to new directory specification, paths are
985 not returned but assigned to helm environment variables
986
987 :param cluster_name: cluster_name
988 :return: Dictionary with config_paths and dictionary with helm environment variables
989 """
990
991 @abc.abstractmethod
992 async def _cluster_init(self, cluster_id, namespace, paths, env):
993 """
994 Implements the helm version dependent cluster initialization
995 """
996
997 @abc.abstractmethod
998 async def _instances_list(self, cluster_id):
999 """
1000 Implements the helm version dependent helm instances list
1001 """
1002
1003 @abc.abstractmethod
1004 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
1005 """
1006 Implements the helm version dependent method to obtain services from a helm instance
1007 """
1008
1009 @abc.abstractmethod
1010 async def _status_kdu(
1011 self,
1012 cluster_id: str,
1013 kdu_instance: str,
1014 namespace: str = None,
1015 show_error_log: bool = False,
1016 return_text: bool = False,
1017 ):
1018 """
1019 Implements the helm version dependent method to obtain status of a helm instance
1020 """
1021
1022 @abc.abstractmethod
1023 def _get_install_command(
1024 self,
1025 kdu_model,
1026 kdu_instance,
1027 namespace,
1028 params_str,
1029 version,
1030 atomic,
1031 timeout,
1032 kubeconfig,
1033 ) -> str:
1034 """
1035 Obtain command to be executed to delete the indicated instance
1036 """
1037
1038 @abc.abstractmethod
1039 def _get_upgrade_command(
1040 self,
1041 kdu_model,
1042 kdu_instance,
1043 namespace,
1044 params_str,
1045 version,
1046 atomic,
1047 timeout,
1048 kubeconfig,
1049 ) -> str:
1050 """
1051 Obtain command to be executed to upgrade the indicated instance
1052 """
1053
1054 @abc.abstractmethod
1055 def _get_rollback_command(
1056 self, kdu_instance, namespace, revision, kubeconfig
1057 ) -> str:
1058 """
1059 Obtain command to be executed to rollback the indicated instance
1060 """
1061
1062 @abc.abstractmethod
1063 def _get_uninstall_command(
1064 self, kdu_instance: str, namespace: str, kubeconfig: str
1065 ) -> str:
1066 """
1067 Obtain command to be executed to delete the indicated instance
1068 """
1069
1070 @abc.abstractmethod
1071 def _get_inspect_command(
1072 self, show_command: str, kdu_model: str, repo_str: str, version: str
1073 ):
1074 """
1075 Obtain command to be executed to obtain information about the kdu
1076 """
1077
1078 @abc.abstractmethod
1079 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1080 """
1081 Method call to uninstall cluster software for helm. This method is dependent
1082 of helm version
1083 For Helm v2 it will be called when Tiller must be uninstalled
1084 For Helm v3 it does nothing and does not need to be callled
1085 """
1086
1087 @abc.abstractmethod
1088 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1089 """
1090 Obtains the cluster repos identifiers
1091 """
1092
1093 """
1094 ####################################################################################
1095 ################################### P R I V A T E ##################################
1096 ####################################################################################
1097 """
1098
1099 @staticmethod
1100 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1101 if os.path.exists(filename):
1102 return True
1103 else:
1104 msg = "File {} does not exist".format(filename)
1105 if exception_if_not_exists:
1106 raise K8sException(msg)
1107
1108 @staticmethod
1109 def _remove_multiple_spaces(strobj):
1110 strobj = strobj.strip()
1111 while " " in strobj:
1112 strobj = strobj.replace(" ", " ")
1113 return strobj
1114
1115 @staticmethod
1116 def _output_to_lines(output: str) -> list:
1117 output_lines = list()
1118 lines = output.splitlines(keepends=False)
1119 for line in lines:
1120 line = line.strip()
1121 if len(line) > 0:
1122 output_lines.append(line)
1123 return output_lines
1124
1125 @staticmethod
1126 def _output_to_table(output: str) -> list:
1127 output_table = list()
1128 lines = output.splitlines(keepends=False)
1129 for line in lines:
1130 line = line.replace("\t", " ")
1131 line_list = list()
1132 output_table.append(line_list)
1133 cells = line.split(sep=" ")
1134 for cell in cells:
1135 cell = cell.strip()
1136 if len(cell) > 0:
1137 line_list.append(cell)
1138 return output_table
1139
1140 @staticmethod
1141 def _parse_services(output: str) -> list:
1142 lines = output.splitlines(keepends=False)
1143 services = []
1144 for line in lines:
1145 line = line.replace("\t", " ")
1146 cells = line.split(sep=" ")
1147 if len(cells) > 0 and cells[0].startswith("service/"):
1148 elems = cells[0].split(sep="/")
1149 if len(elems) > 1:
1150 services.append(elems[1])
1151 return services
1152
1153 @staticmethod
1154 def _get_deep(dictionary: dict, members: tuple):
1155 target = dictionary
1156 value = None
1157 try:
1158 for m in members:
1159 value = target.get(m)
1160 if not value:
1161 return None
1162 else:
1163 target = value
1164 except Exception:
1165 pass
1166 return value
1167
1168 # find key:value in several lines
1169 @staticmethod
1170 def _find_in_lines(p_lines: list, p_key: str) -> str:
1171 for line in p_lines:
1172 try:
1173 if line.startswith(p_key + ":"):
1174 parts = line.split(":")
1175 the_value = parts[1].strip()
1176 return the_value
1177 except Exception:
1178 # ignore it
1179 pass
1180 return None
1181
1182 @staticmethod
1183 def _lower_keys_list(input_list: list):
1184 """
1185 Transform the keys in a list of dictionaries to lower case and returns a new list
1186 of dictionaries
1187 """
1188 new_list = []
1189 if input_list:
1190 for dictionary in input_list:
1191 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1192 new_list.append(new_dict)
1193 return new_list
1194
1195 async def _local_async_exec(
1196 self,
1197 command: str,
1198 raise_exception_on_error: bool = False,
1199 show_error_log: bool = True,
1200 encode_utf8: bool = False,
1201 env: dict = None,
1202 ) -> (str, int):
1203
1204 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1205 self.log.debug(
1206 "Executing async local command: {}, env: {}".format(command, env)
1207 )
1208
1209 # split command
1210 command = shlex.split(command)
1211
1212 environ = os.environ.copy()
1213 if env:
1214 environ.update(env)
1215
1216 try:
1217 process = await asyncio.create_subprocess_exec(
1218 *command,
1219 stdout=asyncio.subprocess.PIPE,
1220 stderr=asyncio.subprocess.PIPE,
1221 env=environ,
1222 )
1223
1224 # wait for command terminate
1225 stdout, stderr = await process.communicate()
1226
1227 return_code = process.returncode
1228
1229 output = ""
1230 if stdout:
1231 output = stdout.decode("utf-8").strip()
1232 # output = stdout.decode()
1233 if stderr:
1234 output = stderr.decode("utf-8").strip()
1235 # output = stderr.decode()
1236
1237 if return_code != 0 and show_error_log:
1238 self.log.debug(
1239 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1240 )
1241 else:
1242 self.log.debug("Return code: {}".format(return_code))
1243
1244 if raise_exception_on_error and return_code != 0:
1245 raise K8sException(output)
1246
1247 if encode_utf8:
1248 output = output.encode("utf-8").strip()
1249 output = str(output).replace("\\n", "\n")
1250
1251 return output, return_code
1252
1253 except asyncio.CancelledError:
1254 raise
1255 except K8sException:
1256 raise
1257 except Exception as e:
1258 msg = "Exception executing command: {} -> {}".format(command, e)
1259 self.log.error(msg)
1260 if raise_exception_on_error:
1261 raise K8sException(e) from e
1262 else:
1263 return "", -1
1264
1265 async def _local_async_exec_pipe(
1266 self,
1267 command1: str,
1268 command2: str,
1269 raise_exception_on_error: bool = True,
1270 show_error_log: bool = True,
1271 encode_utf8: bool = False,
1272 env: dict = None,
1273 ):
1274
1275 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1276 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1277 command = "{} | {}".format(command1, command2)
1278 self.log.debug(
1279 "Executing async local command: {}, env: {}".format(command, env)
1280 )
1281
1282 # split command
1283 command1 = shlex.split(command1)
1284 command2 = shlex.split(command2)
1285
1286 environ = os.environ.copy()
1287 if env:
1288 environ.update(env)
1289
1290 try:
1291 read, write = os.pipe()
1292 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1293 os.close(write)
1294 process_2 = await asyncio.create_subprocess_exec(
1295 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1296 )
1297 os.close(read)
1298 stdout, stderr = await process_2.communicate()
1299
1300 return_code = process_2.returncode
1301
1302 output = ""
1303 if stdout:
1304 output = stdout.decode("utf-8").strip()
1305 # output = stdout.decode()
1306 if stderr:
1307 output = stderr.decode("utf-8").strip()
1308 # output = stderr.decode()
1309
1310 if return_code != 0 and show_error_log:
1311 self.log.debug(
1312 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1313 )
1314 else:
1315 self.log.debug("Return code: {}".format(return_code))
1316
1317 if raise_exception_on_error and return_code != 0:
1318 raise K8sException(output)
1319
1320 if encode_utf8:
1321 output = output.encode("utf-8").strip()
1322 output = str(output).replace("\\n", "\n")
1323
1324 return output, return_code
1325 except asyncio.CancelledError:
1326 raise
1327 except K8sException:
1328 raise
1329 except Exception as e:
1330 msg = "Exception executing command: {} -> {}".format(command, e)
1331 self.log.error(msg)
1332 if raise_exception_on_error:
1333 raise K8sException(e) from e
1334 else:
1335 return "", -1
1336
1337 async def _get_service(self, cluster_id, service_name, namespace):
1338 """
1339 Obtains the data of the specified service in the k8cluster.
1340
1341 :param cluster_id: id of a K8s cluster known by OSM
1342 :param service_name: name of the K8s service in the specified namespace
1343 :param namespace: K8s namespace used by the KDU instance
1344 :return: If successful, it will return a service with the following data:
1345 - `name` of the service
1346 - `type` type of service in the k8 cluster
1347 - `ports` List of ports offered by the service, for each port includes at least
1348 name, port, protocol
1349 - `cluster_ip` Internal ip to be used inside k8s cluster
1350 - `external_ip` List of external ips (in case they are available)
1351 """
1352
1353 # init config, env
1354 paths, env = self._init_paths_env(
1355 cluster_name=cluster_id, create_if_not_exist=True
1356 )
1357
1358 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1359 self.kubectl_command, paths["kube_config"], namespace, service_name
1360 )
1361
1362 output, _rc = await self._local_async_exec(
1363 command=command, raise_exception_on_error=True, env=env
1364 )
1365
1366 data = yaml.load(output, Loader=yaml.SafeLoader)
1367
1368 service = {
1369 "name": service_name,
1370 "type": self._get_deep(data, ("spec", "type")),
1371 "ports": self._get_deep(data, ("spec", "ports")),
1372 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1373 }
1374 if service["type"] == "LoadBalancer":
1375 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1376 ip_list = [elem["ip"] for elem in ip_map_list]
1377 service["external_ip"] = ip_list
1378
1379 return service
1380
1381 async def _exec_inspect_comand(
1382 self, inspect_command: str, kdu_model: str, repo_url: str = None
1383 ):
1384 """
1385 Obtains information about a kdu, no cluster (no env)
1386 """
1387
1388 repo_str = ""
1389 if repo_url:
1390 repo_str = " --repo {}".format(repo_url)
1391
1392 idx = kdu_model.find("/")
1393 if idx >= 0:
1394 idx += 1
1395 kdu_model = kdu_model[idx:]
1396
1397 version = ""
1398 if ":" in kdu_model:
1399 parts = kdu_model.split(sep=":")
1400 if len(parts) == 2:
1401 version = "--version {}".format(str(parts[1]))
1402 kdu_model = parts[0]
1403
1404 full_command = self._get_inspect_command(
1405 inspect_command, kdu_model, repo_str, version
1406 )
1407 output, _rc = await self._local_async_exec(
1408 command=full_command, encode_utf8=True
1409 )
1410
1411 return output
1412
1413 async def _store_status(
1414 self,
1415 cluster_id: str,
1416 operation: str,
1417 kdu_instance: str,
1418 namespace: str = None,
1419 check_every: float = 10,
1420 db_dict: dict = None,
1421 run_once: bool = False,
1422 ):
1423 while True:
1424 try:
1425 await asyncio.sleep(check_every)
1426 detailed_status = await self._status_kdu(
1427 cluster_id=cluster_id,
1428 kdu_instance=kdu_instance,
1429 namespace=namespace,
1430 return_text=False,
1431 )
1432 status = detailed_status.get("info").get("description")
1433 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1434 # write status to db
1435 result = await self.write_app_status_to_db(
1436 db_dict=db_dict,
1437 status=str(status),
1438 detailed_status=str(detailed_status),
1439 operation=operation,
1440 )
1441 if not result:
1442 self.log.info("Error writing in database. Task exiting...")
1443 return
1444 except asyncio.CancelledError:
1445 self.log.debug("Task cancelled")
1446 return
1447 except Exception as e:
1448 self.log.debug(
1449 "_store_status exception: {}".format(str(e)), exc_info=True
1450 )
1451 pass
1452 finally:
1453 if run_once:
1454 return
1455
1456 # params for use in -f file
1457 # returns values file option and filename (in order to delete it at the end)
1458 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1459
1460 if params and len(params) > 0:
1461 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1462
1463 def get_random_number():
1464 r = random.randrange(start=1, stop=99999999)
1465 s = str(r)
1466 while len(s) < 10:
1467 s = "0" + s
1468 return s
1469
1470 params2 = dict()
1471 for key in params:
1472 value = params.get(key)
1473 if "!!yaml" in str(value):
1474 value = yaml.load(value[7:])
1475 params2[key] = value
1476
1477 values_file = get_random_number() + ".yaml"
1478 with open(values_file, "w") as stream:
1479 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1480
1481 return "-f {}".format(values_file), values_file
1482
1483 return "", None
1484
1485 # params for use in --set option
1486 @staticmethod
1487 def _params_to_set_option(params: dict) -> str:
1488 params_str = ""
1489 if params and len(params) > 0:
1490 start = True
1491 for key in params:
1492 value = params.get(key, None)
1493 if value is not None:
1494 if start:
1495 params_str += "--set "
1496 start = False
1497 else:
1498 params_str += ","
1499 params_str += "{}={}".format(key, value)
1500 return params_str
1501
1502 @staticmethod
1503 def generate_kdu_instance_name(**kwargs):
1504 chart_name = kwargs["kdu_model"]
1505 # check embeded chart (file or dir)
1506 if chart_name.startswith("/"):
1507 # extract file or directory name
1508 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1509 # check URL
1510 elif "://" in chart_name:
1511 # extract last portion of URL
1512 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1513
1514 name = ""
1515 for c in chart_name:
1516 if c.isalpha() or c.isnumeric():
1517 name += c
1518 else:
1519 name += "-"
1520 if len(name) > 35:
1521 name = name[0:35]
1522
1523 # if does not start with alpha character, prefix 'a'
1524 if not name[0].isalpha():
1525 name = "a" + name
1526
1527 name += "-"
1528
1529 def get_random_number():
1530 r = random.randrange(start=1, stop=99999999)
1531 s = str(r)
1532 s = s.rjust(10, "0")
1533 return s
1534
1535 name = name + get_random_number()
1536 return name.lower()