Fix bug 1636: remove the default 30s timeout in retry
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import os
30 import yaml
31 from uuid import uuid4
32
33 from n2vc.config import EnvironConfig
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 ):
57 """
58
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
63 :param log: logger
64 :param on_update_db: callback called when k8s connector updates database
65 """
66
67 # parent class
68 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
69
70 self.log.info("Initializing K8S Helm connector")
71
72 self.config = EnvironConfig()
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 self._stable_repo_url = self.config.get("stablerepourl")
89 if self._stable_repo_url == "None":
90 self._stable_repo_url = None
91
92 @staticmethod
93 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
94 """
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
97 """
98 namespace, _, cluster_id = cluster_uuid.rpartition(":")
99 return namespace, cluster_id
100
101 async def init_env(
102 self,
103 k8s_creds: str,
104 namespace: str = "kube-system",
105 reuse_cluster_uuid=None,
106 **kwargs,
107 ) -> (str, bool):
108 """
109 It prepares a given K8s cluster environment to run Charts
110
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
112 '.kube/config'
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
120 """
121
122 if reuse_cluster_uuid:
123 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
124 namespace = namespace_ or namespace
125 else:
126 cluster_id = str(uuid4())
127 cluster_uuid = "{}:{}".format(namespace, cluster_id)
128
129 self.log.debug(
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
131 )
132
133 paths, env = self._init_paths_env(
134 cluster_name=cluster_id, create_if_not_exist=True
135 )
136 mode = stat.S_IRUSR | stat.S_IWUSR
137 with open(paths["kube_config"], "w", mode) as f:
138 f.write(k8s_creds)
139 os.chmod(paths["kube_config"], 0o600)
140
141 # Code with initialization specific of helm version
142 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
143
144 # sync fs with local data
145 self.fs.reverse_sync(from_path=cluster_id)
146
147 self.log.info("Cluster {} initialized".format(cluster_id))
148
149 return cluster_uuid, n2vc_installed_sw
150
151 async def repo_add(
152 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
153 ):
154 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
155 self.log.debug(
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id, repo_type, name, url
158 )
159 )
160
161 # sync local dir
162 self.fs.sync(from_path=cluster_id)
163
164 # init_env
165 paths, env = self._init_paths_env(
166 cluster_name=cluster_id, create_if_not_exist=True
167 )
168
169 # helm repo update
170 command = "{} repo update".format(self._helm_command)
171 self.log.debug("updating repo: {}".format(command))
172 await self._local_async_exec(
173 command=command, raise_exception_on_error=False, env=env
174 )
175
176 # helm repo add name url
177 command = "{} repo add {} {}".format(self._helm_command, name, url)
178 self.log.debug("adding repo: {}".format(command))
179 await self._local_async_exec(
180 command=command, raise_exception_on_error=True, env=env
181 )
182
183 # sync fs
184 self.fs.reverse_sync(from_path=cluster_id)
185
186 async def repo_list(self, cluster_uuid: str) -> list:
187 """
188 Get the list of registered repositories
189
190 :return: list of registered repositories: [ (name, url) .... ]
191 """
192
193 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
194 self.log.debug("list repositories for cluster {}".format(cluster_id))
195
196 # sync local dir
197 self.fs.sync(from_path=cluster_id)
198
199 # config filename
200 paths, env = self._init_paths_env(
201 cluster_name=cluster_id, create_if_not_exist=True
202 )
203
204 command = "{} repo list --output yaml".format(self._helm_command)
205
206 # Set exception to false because if there are no repos just want an empty list
207 output, _rc = await self._local_async_exec(
208 command=command, raise_exception_on_error=False, env=env
209 )
210
211 # sync fs
212 self.fs.reverse_sync(from_path=cluster_id)
213
214 if _rc == 0:
215 if output and len(output) > 0:
216 repos = yaml.load(output, Loader=yaml.SafeLoader)
217 # unify format between helm2 and helm3 setting all keys lowercase
218 return self._lower_keys_list(repos)
219 else:
220 return []
221 else:
222 return []
223
224 async def repo_remove(self, cluster_uuid: str, name: str):
225
226 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
227 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
228
229 # sync local dir
230 self.fs.sync(from_path=cluster_id)
231
232 # init env, paths
233 paths, env = self._init_paths_env(
234 cluster_name=cluster_id, create_if_not_exist=True
235 )
236
237 command = "{} repo remove {}".format(self._helm_command, name)
238 await self._local_async_exec(
239 command=command, raise_exception_on_error=True, env=env
240 )
241
242 # sync fs
243 self.fs.reverse_sync(from_path=cluster_id)
244
245 async def reset(
246 self,
247 cluster_uuid: str,
248 force: bool = False,
249 uninstall_sw: bool = False,
250 **kwargs,
251 ) -> bool:
252 """Reset a cluster
253
254 Resets the Kubernetes cluster by removing the helm deployment that represents it.
255
256 :param cluster_uuid: The UUID of the cluster to reset
257 :param force: Boolean to force the reset
258 :param uninstall_sw: Boolean to force the reset
259 :param kwargs: Additional parameters (None yet)
260 :return: Returns True if successful or raises an exception.
261 """
262 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
263 self.log.debug(
264 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
265 cluster_id, uninstall_sw
266 )
267 )
268
269 # sync local dir
270 self.fs.sync(from_path=cluster_id)
271
272 # uninstall releases if needed.
273 if uninstall_sw:
274 releases = await self.instances_list(cluster_uuid=cluster_uuid)
275 if len(releases) > 0:
276 if force:
277 for r in releases:
278 try:
279 kdu_instance = r.get("name")
280 chart = r.get("chart")
281 self.log.debug(
282 "Uninstalling {} -> {}".format(chart, kdu_instance)
283 )
284 await self.uninstall(
285 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
286 )
287 except Exception as e:
288 # will not raise exception as it was found
289 # that in some cases of previously installed helm releases it
290 # raised an error
291 self.log.warn(
292 "Error uninstalling release {}: {}".format(
293 kdu_instance, e
294 )
295 )
296 else:
297 msg = (
298 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
299 ).format(cluster_id)
300 self.log.warn(msg)
301 uninstall_sw = (
302 False # Allow to remove k8s cluster without removing Tiller
303 )
304
305 if uninstall_sw:
306 await self._uninstall_sw(cluster_id, namespace)
307
308 # delete cluster directory
309 self.log.debug("Removing directory {}".format(cluster_id))
310 self.fs.file_delete(cluster_id, ignore_non_exist=True)
311 # Remove also local directorio if still exist
312 direct = self.fs.path + "/" + cluster_id
313 shutil.rmtree(direct, ignore_errors=True)
314
315 return True
316
317 async def _install_impl(
318 self,
319 cluster_id: str,
320 kdu_model: str,
321 paths: dict,
322 env: dict,
323 kdu_instance: str,
324 atomic: bool = True,
325 timeout: float = 300,
326 params: dict = None,
327 db_dict: dict = None,
328 kdu_name: str = None,
329 namespace: str = None,
330 ):
331 # params to str
332 params_str, file_to_delete = self._params_to_file_option(
333 cluster_id=cluster_id, params=params
334 )
335
336 # version
337 version = None
338 if ":" in kdu_model:
339 parts = kdu_model.split(sep=":")
340 if len(parts) == 2:
341 version = str(parts[1])
342 kdu_model = parts[0]
343
344 command = self._get_install_command(
345 kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
346 )
347
348 self.log.debug("installing: {}".format(command))
349
350 if atomic:
351 # exec helm in a task
352 exec_task = asyncio.ensure_future(
353 coro_or_future=self._local_async_exec(
354 command=command, raise_exception_on_error=False, env=env
355 )
356 )
357
358 # write status in another task
359 status_task = asyncio.ensure_future(
360 coro_or_future=self._store_status(
361 cluster_id=cluster_id,
362 kdu_instance=kdu_instance,
363 namespace=namespace,
364 db_dict=db_dict,
365 operation="install",
366 run_once=False,
367 )
368 )
369
370 # wait for execution task
371 await asyncio.wait([exec_task])
372
373 # cancel status task
374 status_task.cancel()
375
376 output, rc = exec_task.result()
377
378 else:
379
380 output, rc = await self._local_async_exec(
381 command=command, raise_exception_on_error=False, env=env
382 )
383
384 # remove temporal values yaml file
385 if file_to_delete:
386 os.remove(file_to_delete)
387
388 # write final status
389 await self._store_status(
390 cluster_id=cluster_id,
391 kdu_instance=kdu_instance,
392 namespace=namespace,
393 db_dict=db_dict,
394 operation="install",
395 run_once=True,
396 check_every=0,
397 )
398
399 if rc != 0:
400 msg = "Error executing command: {}\nOutput: {}".format(command, output)
401 self.log.error(msg)
402 raise K8sException(msg)
403
404 async def upgrade(
405 self,
406 cluster_uuid: str,
407 kdu_instance: str,
408 kdu_model: str = None,
409 atomic: bool = True,
410 timeout: float = 300,
411 params: dict = None,
412 db_dict: dict = None,
413 ):
414 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
415 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
416
417 # sync local dir
418 self.fs.sync(from_path=cluster_id)
419
420 # look for instance to obtain namespace
421 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
422 if not instance_info:
423 raise K8sException("kdu_instance {} not found".format(kdu_instance))
424
425 # init env, paths
426 paths, env = self._init_paths_env(
427 cluster_name=cluster_id, create_if_not_exist=True
428 )
429
430 # params to str
431 params_str, file_to_delete = self._params_to_file_option(
432 cluster_id=cluster_id, params=params
433 )
434
435 # version
436 version = None
437 if ":" in kdu_model:
438 parts = kdu_model.split(sep=":")
439 if len(parts) == 2:
440 version = str(parts[1])
441 kdu_model = parts[0]
442
443 command = self._get_upgrade_command(
444 kdu_model,
445 kdu_instance,
446 instance_info["namespace"],
447 params_str,
448 version,
449 atomic,
450 timeout,
451 )
452
453 self.log.debug("upgrading: {}".format(command))
454
455 if atomic:
456
457 # exec helm in a task
458 exec_task = asyncio.ensure_future(
459 coro_or_future=self._local_async_exec(
460 command=command, raise_exception_on_error=False, env=env
461 )
462 )
463 # write status in another task
464 status_task = asyncio.ensure_future(
465 coro_or_future=self._store_status(
466 cluster_id=cluster_id,
467 kdu_instance=kdu_instance,
468 namespace=instance_info["namespace"],
469 db_dict=db_dict,
470 operation="upgrade",
471 run_once=False,
472 )
473 )
474
475 # wait for execution task
476 await asyncio.wait([exec_task])
477
478 # cancel status task
479 status_task.cancel()
480 output, rc = exec_task.result()
481
482 else:
483
484 output, rc = await self._local_async_exec(
485 command=command, raise_exception_on_error=False, env=env
486 )
487
488 # remove temporal values yaml file
489 if file_to_delete:
490 os.remove(file_to_delete)
491
492 # write final status
493 await self._store_status(
494 cluster_id=cluster_id,
495 kdu_instance=kdu_instance,
496 namespace=instance_info["namespace"],
497 db_dict=db_dict,
498 operation="upgrade",
499 run_once=True,
500 check_every=0,
501 )
502
503 if rc != 0:
504 msg = "Error executing command: {}\nOutput: {}".format(command, output)
505 self.log.error(msg)
506 raise K8sException(msg)
507
508 # sync fs
509 self.fs.reverse_sync(from_path=cluster_id)
510
511 # return new revision number
512 instance = await self.get_instance_info(
513 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
514 )
515 if instance:
516 revision = int(instance.get("revision"))
517 self.log.debug("New revision: {}".format(revision))
518 return revision
519 else:
520 return 0
521
522 async def scale(
523 self,
524 kdu_instance: str,
525 scale: int,
526 resource_name: str,
527 total_timeout: float = 1800,
528 **kwargs,
529 ):
530 raise NotImplementedError("Method not implemented")
531
532 async def get_scale_count(
533 self,
534 resource_name: str,
535 kdu_instance: str,
536 **kwargs,
537 ):
538 raise NotImplementedError("Method not implemented")
539
540 async def rollback(
541 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
542 ):
543
544 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
545 self.log.debug(
546 "rollback kdu_instance {} to revision {} from cluster {}".format(
547 kdu_instance, revision, cluster_id
548 )
549 )
550
551 # sync local dir
552 self.fs.sync(from_path=cluster_id)
553
554 # look for instance to obtain namespace
555 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
556 if not instance_info:
557 raise K8sException("kdu_instance {} not found".format(kdu_instance))
558
559 # init env, paths
560 paths, env = self._init_paths_env(
561 cluster_name=cluster_id, create_if_not_exist=True
562 )
563
564 command = self._get_rollback_command(
565 kdu_instance, instance_info["namespace"], revision
566 )
567
568 self.log.debug("rolling_back: {}".format(command))
569
570 # exec helm in a task
571 exec_task = asyncio.ensure_future(
572 coro_or_future=self._local_async_exec(
573 command=command, raise_exception_on_error=False, env=env
574 )
575 )
576 # write status in another task
577 status_task = asyncio.ensure_future(
578 coro_or_future=self._store_status(
579 cluster_id=cluster_id,
580 kdu_instance=kdu_instance,
581 namespace=instance_info["namespace"],
582 db_dict=db_dict,
583 operation="rollback",
584 run_once=False,
585 )
586 )
587
588 # wait for execution task
589 await asyncio.wait([exec_task])
590
591 # cancel status task
592 status_task.cancel()
593
594 output, rc = exec_task.result()
595
596 # write final status
597 await self._store_status(
598 cluster_id=cluster_id,
599 kdu_instance=kdu_instance,
600 namespace=instance_info["namespace"],
601 db_dict=db_dict,
602 operation="rollback",
603 run_once=True,
604 check_every=0,
605 )
606
607 if rc != 0:
608 msg = "Error executing command: {}\nOutput: {}".format(command, output)
609 self.log.error(msg)
610 raise K8sException(msg)
611
612 # sync fs
613 self.fs.reverse_sync(from_path=cluster_id)
614
615 # return new revision number
616 instance = await self.get_instance_info(
617 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
618 )
619 if instance:
620 revision = int(instance.get("revision"))
621 self.log.debug("New revision: {}".format(revision))
622 return revision
623 else:
624 return 0
625
626 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
627 """
628 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
629 (this call should happen after all _terminate-config-primitive_ of the VNF
630 are invoked).
631
632 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
633 :param kdu_instance: unique name for the KDU instance to be deleted
634 :param kwargs: Additional parameters (None yet)
635 :return: True if successful
636 """
637
638 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
639 self.log.debug(
640 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
641 )
642
643 # sync local dir
644 self.fs.sync(from_path=cluster_id)
645
646 # look for instance to obtain namespace
647 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
648 if not instance_info:
649 self.log.warning(("kdu_instance {} not found".format(kdu_instance)))
650 return True
651 # init env, paths
652 paths, env = self._init_paths_env(
653 cluster_name=cluster_id, create_if_not_exist=True
654 )
655
656 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
657 output, _rc = await self._local_async_exec(
658 command=command, raise_exception_on_error=True, env=env
659 )
660
661 # sync fs
662 self.fs.reverse_sync(from_path=cluster_id)
663
664 return self._output_to_table(output)
665
666 async def instances_list(self, cluster_uuid: str) -> list:
667 """
668 returns a list of deployed releases in a cluster
669
670 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
671 :return:
672 """
673
674 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
675 self.log.debug("list releases for cluster {}".format(cluster_id))
676
677 # sync local dir
678 self.fs.sync(from_path=cluster_id)
679
680 # execute internal command
681 result = await self._instances_list(cluster_id)
682
683 # sync fs
684 self.fs.reverse_sync(from_path=cluster_id)
685
686 return result
687
688 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
689 instances = await self.instances_list(cluster_uuid=cluster_uuid)
690 for instance in instances:
691 if instance.get("name") == kdu_instance:
692 return instance
693 self.log.debug("Instance {} not found".format(kdu_instance))
694 return None
695
696 async def exec_primitive(
697 self,
698 cluster_uuid: str = None,
699 kdu_instance: str = None,
700 primitive_name: str = None,
701 timeout: float = 300,
702 params: dict = None,
703 db_dict: dict = None,
704 **kwargs,
705 ) -> str:
706 """Exec primitive (Juju action)
707
708 :param cluster_uuid: The UUID of the cluster or namespace:cluster
709 :param kdu_instance: The unique name of the KDU instance
710 :param primitive_name: Name of action that will be executed
711 :param timeout: Timeout for action execution
712 :param params: Dictionary of all the parameters needed for the action
713 :db_dict: Dictionary for any additional data
714 :param kwargs: Additional parameters (None yet)
715
716 :return: Returns the output of the action
717 """
718 raise K8sException(
719 "KDUs deployed with Helm don't support actions "
720 "different from rollback, upgrade and status"
721 )
722
723 async def get_services(
724 self, cluster_uuid: str, kdu_instance: str, namespace: str
725 ) -> list:
726 """
727 Returns a list of services defined for the specified kdu instance.
728
729 :param cluster_uuid: UUID of a K8s cluster known by OSM
730 :param kdu_instance: unique name for the KDU instance
731 :param namespace: K8s namespace used by the KDU instance
732 :return: If successful, it will return a list of services, Each service
733 can have the following data:
734 - `name` of the service
735 - `type` type of service in the k8 cluster
736 - `ports` List of ports offered by the service, for each port includes at least
737 name, port, protocol
738 - `cluster_ip` Internal ip to be used inside k8s cluster
739 - `external_ip` List of external ips (in case they are available)
740 """
741
742 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
743 self.log.debug(
744 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
745 cluster_uuid, kdu_instance
746 )
747 )
748
749 # sync local dir
750 self.fs.sync(from_path=cluster_id)
751
752 # get list of services names for kdu
753 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
754
755 service_list = []
756 for service in service_names:
757 service = await self._get_service(cluster_id, service, namespace)
758 service_list.append(service)
759
760 # sync fs
761 self.fs.reverse_sync(from_path=cluster_id)
762
763 return service_list
764
765 async def get_service(
766 self, cluster_uuid: str, service_name: str, namespace: str
767 ) -> object:
768
769 self.log.debug(
770 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
771 service_name, namespace, cluster_uuid
772 )
773 )
774
775 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
776
777 # sync local dir
778 self.fs.sync(from_path=cluster_id)
779
780 service = await self._get_service(cluster_id, service_name, namespace)
781
782 # sync fs
783 self.fs.reverse_sync(from_path=cluster_id)
784
785 return service
786
787 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
788 """
789 This call would retrieve tha current state of a given KDU instance. It would be
790 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
791 values_ of the configuration parameters applied to a given instance. This call
792 would be based on the `status` call.
793
794 :param cluster_uuid: UUID of a K8s cluster known by OSM
795 :param kdu_instance: unique name for the KDU instance
796 :param kwargs: Additional parameters (None yet)
797 :return: If successful, it will return the following vector of arguments:
798 - K8s `namespace` in the cluster where the KDU lives
799 - `state` of the KDU instance. It can be:
800 - UNKNOWN
801 - DEPLOYED
802 - DELETED
803 - SUPERSEDED
804 - FAILED or
805 - DELETING
806 - List of `resources` (objects) that this release consists of, sorted by kind,
807 and the status of those resources
808 - Last `deployment_time`.
809
810 """
811 self.log.debug(
812 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
813 cluster_uuid, kdu_instance
814 )
815 )
816
817 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
818
819 # sync local dir
820 self.fs.sync(from_path=cluster_id)
821
822 # get instance: needed to obtain namespace
823 instances = await self._instances_list(cluster_id=cluster_id)
824 for instance in instances:
825 if instance.get("name") == kdu_instance:
826 break
827 else:
828 # instance does not exist
829 raise K8sException(
830 "Instance name: {} not found in cluster: {}".format(
831 kdu_instance, cluster_id
832 )
833 )
834
835 status = await self._status_kdu(
836 cluster_id=cluster_id,
837 kdu_instance=kdu_instance,
838 namespace=instance["namespace"],
839 show_error_log=True,
840 return_text=True,
841 )
842
843 # sync fs
844 self.fs.reverse_sync(from_path=cluster_id)
845
846 return status
847
848 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
849
850 self.log.debug(
851 "inspect kdu_model values {} from (optional) repo: {}".format(
852 kdu_model, repo_url
853 )
854 )
855
856 return await self._exec_inspect_comand(
857 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
858 )
859
860 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
861
862 self.log.debug(
863 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
864 )
865
866 return await self._exec_inspect_comand(
867 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
868 )
869
870 async def synchronize_repos(self, cluster_uuid: str):
871
872 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
873 try:
874 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
875 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
876
877 local_repo_list = await self.repo_list(cluster_uuid)
878 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
879
880 deleted_repo_list = []
881 added_repo_dict = {}
882
883 # iterate over the list of repos in the database that should be
884 # added if not present
885 for repo_name, db_repo in db_repo_dict.items():
886 try:
887 # check if it is already present
888 curr_repo_url = local_repo_dict.get(db_repo["name"])
889 repo_id = db_repo.get("_id")
890 if curr_repo_url != db_repo["url"]:
891 if curr_repo_url:
892 self.log.debug(
893 "repo {} url changed, delete and and again".format(
894 db_repo["url"]
895 )
896 )
897 await self.repo_remove(cluster_uuid, db_repo["name"])
898 deleted_repo_list.append(repo_id)
899
900 # add repo
901 self.log.debug("add repo {}".format(db_repo["name"]))
902 await self.repo_add(
903 cluster_uuid, db_repo["name"], db_repo["url"]
904 )
905 added_repo_dict[repo_id] = db_repo["name"]
906 except Exception as e:
907 raise K8sException(
908 "Error adding repo id: {}, err_msg: {} ".format(
909 repo_id, repr(e)
910 )
911 )
912
913 # Delete repos that are present but not in nbi_list
914 for repo_name in local_repo_dict:
915 if not db_repo_dict.get(repo_name) and repo_name != "stable":
916 self.log.debug("delete repo {}".format(repo_name))
917 try:
918 await self.repo_remove(cluster_uuid, repo_name)
919 deleted_repo_list.append(repo_name)
920 except Exception as e:
921 self.warning(
922 "Error deleting repo, name: {}, err_msg: {}".format(
923 repo_name, str(e)
924 )
925 )
926
927 return deleted_repo_list, added_repo_dict
928
929 except K8sException:
930 raise
931 except Exception as e:
932 # Do not raise errors synchronizing repos
933 self.log.error("Error synchronizing repos: {}".format(e))
934 raise Exception("Error synchronizing repos: {}".format(e))
935
936 def _get_db_repos_dict(self, repo_ids: list):
937 db_repos_dict = {}
938 for repo_id in repo_ids:
939 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
940 db_repos_dict[db_repo["name"]] = db_repo
941 return db_repos_dict
942
943 """
944 ####################################################################################
945 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
946 ####################################################################################
947 """
948
949 @abc.abstractmethod
950 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
951 """
952 Creates and returns base cluster and kube dirs and returns them.
953 Also created helm3 dirs according to new directory specification, paths are
954 not returned but assigned to helm environment variables
955
956 :param cluster_name: cluster_name
957 :return: Dictionary with config_paths and dictionary with helm environment variables
958 """
959
960 @abc.abstractmethod
961 async def _cluster_init(self, cluster_id, namespace, paths, env):
962 """
963 Implements the helm version dependent cluster initialization
964 """
965
966 @abc.abstractmethod
967 async def _instances_list(self, cluster_id):
968 """
969 Implements the helm version dependent helm instances list
970 """
971
972 @abc.abstractmethod
973 async def _get_services(self, cluster_id, kdu_instance, namespace):
974 """
975 Implements the helm version dependent method to obtain services from a helm instance
976 """
977
978 @abc.abstractmethod
979 async def _status_kdu(
980 self,
981 cluster_id: str,
982 kdu_instance: str,
983 namespace: str = None,
984 show_error_log: bool = False,
985 return_text: bool = False,
986 ):
987 """
988 Implements the helm version dependent method to obtain status of a helm instance
989 """
990
991 @abc.abstractmethod
992 def _get_install_command(
993 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
994 ) -> str:
995 """
996 Obtain command to be executed to delete the indicated instance
997 """
998
999 @abc.abstractmethod
1000 def _get_upgrade_command(
1001 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
1002 ) -> str:
1003 """
1004 Obtain command to be executed to upgrade the indicated instance
1005 """
1006
1007 @abc.abstractmethod
1008 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
1009 """
1010 Obtain command to be executed to rollback the indicated instance
1011 """
1012
1013 @abc.abstractmethod
1014 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
1015 """
1016 Obtain command to be executed to delete the indicated instance
1017 """
1018
1019 @abc.abstractmethod
1020 def _get_inspect_command(
1021 self, show_command: str, kdu_model: str, repo_str: str, version: str
1022 ):
1023 """
1024 Obtain command to be executed to obtain information about the kdu
1025 """
1026
1027 @abc.abstractmethod
1028 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1029 """
1030 Method call to uninstall cluster software for helm. This method is dependent
1031 of helm version
1032 For Helm v2 it will be called when Tiller must be uninstalled
1033 For Helm v3 it does nothing and does not need to be callled
1034 """
1035
1036 @abc.abstractmethod
1037 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1038 """
1039 Obtains the cluster repos identifiers
1040 """
1041
1042 """
1043 ####################################################################################
1044 ################################### P R I V A T E ##################################
1045 ####################################################################################
1046 """
1047
1048 @staticmethod
1049 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1050 if os.path.exists(filename):
1051 return True
1052 else:
1053 msg = "File {} does not exist".format(filename)
1054 if exception_if_not_exists:
1055 raise K8sException(msg)
1056
1057 @staticmethod
1058 def _remove_multiple_spaces(strobj):
1059 strobj = strobj.strip()
1060 while " " in strobj:
1061 strobj = strobj.replace(" ", " ")
1062 return strobj
1063
1064 @staticmethod
1065 def _output_to_lines(output: str) -> list:
1066 output_lines = list()
1067 lines = output.splitlines(keepends=False)
1068 for line in lines:
1069 line = line.strip()
1070 if len(line) > 0:
1071 output_lines.append(line)
1072 return output_lines
1073
1074 @staticmethod
1075 def _output_to_table(output: str) -> list:
1076 output_table = list()
1077 lines = output.splitlines(keepends=False)
1078 for line in lines:
1079 line = line.replace("\t", " ")
1080 line_list = list()
1081 output_table.append(line_list)
1082 cells = line.split(sep=" ")
1083 for cell in cells:
1084 cell = cell.strip()
1085 if len(cell) > 0:
1086 line_list.append(cell)
1087 return output_table
1088
1089 @staticmethod
1090 def _parse_services(output: str) -> list:
1091 lines = output.splitlines(keepends=False)
1092 services = []
1093 for line in lines:
1094 line = line.replace("\t", " ")
1095 cells = line.split(sep=" ")
1096 if len(cells) > 0 and cells[0].startswith("service/"):
1097 elems = cells[0].split(sep="/")
1098 if len(elems) > 1:
1099 services.append(elems[1])
1100 return services
1101
1102 @staticmethod
1103 def _get_deep(dictionary: dict, members: tuple):
1104 target = dictionary
1105 value = None
1106 try:
1107 for m in members:
1108 value = target.get(m)
1109 if not value:
1110 return None
1111 else:
1112 target = value
1113 except Exception:
1114 pass
1115 return value
1116
1117 # find key:value in several lines
1118 @staticmethod
1119 def _find_in_lines(p_lines: list, p_key: str) -> str:
1120 for line in p_lines:
1121 try:
1122 if line.startswith(p_key + ":"):
1123 parts = line.split(":")
1124 the_value = parts[1].strip()
1125 return the_value
1126 except Exception:
1127 # ignore it
1128 pass
1129 return None
1130
1131 @staticmethod
1132 def _lower_keys_list(input_list: list):
1133 """
1134 Transform the keys in a list of dictionaries to lower case and returns a new list
1135 of dictionaries
1136 """
1137 new_list = []
1138 if input_list:
1139 for dictionary in input_list:
1140 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1141 new_list.append(new_dict)
1142 return new_list
1143
1144 async def _local_async_exec(
1145 self,
1146 command: str,
1147 raise_exception_on_error: bool = False,
1148 show_error_log: bool = True,
1149 encode_utf8: bool = False,
1150 env: dict = None,
1151 ) -> (str, int):
1152
1153 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1154 self.log.debug(
1155 "Executing async local command: {}, env: {}".format(command, env)
1156 )
1157
1158 # split command
1159 command = shlex.split(command)
1160
1161 environ = os.environ.copy()
1162 if env:
1163 environ.update(env)
1164
1165 try:
1166 process = await asyncio.create_subprocess_exec(
1167 *command,
1168 stdout=asyncio.subprocess.PIPE,
1169 stderr=asyncio.subprocess.PIPE,
1170 env=environ,
1171 )
1172
1173 # wait for command terminate
1174 stdout, stderr = await process.communicate()
1175
1176 return_code = process.returncode
1177
1178 output = ""
1179 if stdout:
1180 output = stdout.decode("utf-8").strip()
1181 # output = stdout.decode()
1182 if stderr:
1183 output = stderr.decode("utf-8").strip()
1184 # output = stderr.decode()
1185
1186 if return_code != 0 and show_error_log:
1187 self.log.debug(
1188 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1189 )
1190 else:
1191 self.log.debug("Return code: {}".format(return_code))
1192
1193 if raise_exception_on_error and return_code != 0:
1194 raise K8sException(output)
1195
1196 if encode_utf8:
1197 output = output.encode("utf-8").strip()
1198 output = str(output).replace("\\n", "\n")
1199
1200 return output, return_code
1201
1202 except asyncio.CancelledError:
1203 raise
1204 except K8sException:
1205 raise
1206 except Exception as e:
1207 msg = "Exception executing command: {} -> {}".format(command, e)
1208 self.log.error(msg)
1209 if raise_exception_on_error:
1210 raise K8sException(e) from e
1211 else:
1212 return "", -1
1213
1214 async def _local_async_exec_pipe(
1215 self,
1216 command1: str,
1217 command2: str,
1218 raise_exception_on_error: bool = True,
1219 show_error_log: bool = True,
1220 encode_utf8: bool = False,
1221 env: dict = None,
1222 ):
1223
1224 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1225 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1226 command = "{} | {}".format(command1, command2)
1227 self.log.debug(
1228 "Executing async local command: {}, env: {}".format(command, env)
1229 )
1230
1231 # split command
1232 command1 = shlex.split(command1)
1233 command2 = shlex.split(command2)
1234
1235 environ = os.environ.copy()
1236 if env:
1237 environ.update(env)
1238
1239 try:
1240 read, write = os.pipe()
1241 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1242 os.close(write)
1243 process_2 = await asyncio.create_subprocess_exec(
1244 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1245 )
1246 os.close(read)
1247 stdout, stderr = await process_2.communicate()
1248
1249 return_code = process_2.returncode
1250
1251 output = ""
1252 if stdout:
1253 output = stdout.decode("utf-8").strip()
1254 # output = stdout.decode()
1255 if stderr:
1256 output = stderr.decode("utf-8").strip()
1257 # output = stderr.decode()
1258
1259 if return_code != 0 and show_error_log:
1260 self.log.debug(
1261 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1262 )
1263 else:
1264 self.log.debug("Return code: {}".format(return_code))
1265
1266 if raise_exception_on_error and return_code != 0:
1267 raise K8sException(output)
1268
1269 if encode_utf8:
1270 output = output.encode("utf-8").strip()
1271 output = str(output).replace("\\n", "\n")
1272
1273 return output, return_code
1274 except asyncio.CancelledError:
1275 raise
1276 except K8sException:
1277 raise
1278 except Exception as e:
1279 msg = "Exception executing command: {} -> {}".format(command, e)
1280 self.log.error(msg)
1281 if raise_exception_on_error:
1282 raise K8sException(e) from e
1283 else:
1284 return "", -1
1285
1286 async def _get_service(self, cluster_id, service_name, namespace):
1287 """
1288 Obtains the data of the specified service in the k8cluster.
1289
1290 :param cluster_id: id of a K8s cluster known by OSM
1291 :param service_name: name of the K8s service in the specified namespace
1292 :param namespace: K8s namespace used by the KDU instance
1293 :return: If successful, it will return a service with the following data:
1294 - `name` of the service
1295 - `type` type of service in the k8 cluster
1296 - `ports` List of ports offered by the service, for each port includes at least
1297 name, port, protocol
1298 - `cluster_ip` Internal ip to be used inside k8s cluster
1299 - `external_ip` List of external ips (in case they are available)
1300 """
1301
1302 # init config, env
1303 paths, env = self._init_paths_env(
1304 cluster_name=cluster_id, create_if_not_exist=True
1305 )
1306
1307 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1308 self.kubectl_command, paths["kube_config"], namespace, service_name
1309 )
1310
1311 output, _rc = await self._local_async_exec(
1312 command=command, raise_exception_on_error=True, env=env
1313 )
1314
1315 data = yaml.load(output, Loader=yaml.SafeLoader)
1316
1317 service = {
1318 "name": service_name,
1319 "type": self._get_deep(data, ("spec", "type")),
1320 "ports": self._get_deep(data, ("spec", "ports")),
1321 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1322 }
1323 if service["type"] == "LoadBalancer":
1324 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1325 ip_list = [elem["ip"] for elem in ip_map_list]
1326 service["external_ip"] = ip_list
1327
1328 return service
1329
1330 async def _exec_inspect_comand(
1331 self, inspect_command: str, kdu_model: str, repo_url: str = None
1332 ):
1333 """
1334 Obtains information about a kdu, no cluster (no env)
1335 """
1336
1337 repo_str = ""
1338 if repo_url:
1339 repo_str = " --repo {}".format(repo_url)
1340
1341 idx = kdu_model.find("/")
1342 if idx >= 0:
1343 idx += 1
1344 kdu_model = kdu_model[idx:]
1345
1346 version = ""
1347 if ":" in kdu_model:
1348 parts = kdu_model.split(sep=":")
1349 if len(parts) == 2:
1350 version = "--version {}".format(str(parts[1]))
1351 kdu_model = parts[0]
1352
1353 full_command = self._get_inspect_command(
1354 inspect_command, kdu_model, repo_str, version
1355 )
1356 output, _rc = await self._local_async_exec(
1357 command=full_command, encode_utf8=True
1358 )
1359
1360 return output
1361
1362 async def _store_status(
1363 self,
1364 cluster_id: str,
1365 operation: str,
1366 kdu_instance: str,
1367 namespace: str = None,
1368 check_every: float = 10,
1369 db_dict: dict = None,
1370 run_once: bool = False,
1371 ):
1372 while True:
1373 try:
1374 await asyncio.sleep(check_every)
1375 detailed_status = await self._status_kdu(
1376 cluster_id=cluster_id,
1377 kdu_instance=kdu_instance,
1378 namespace=namespace,
1379 return_text=False,
1380 )
1381 status = detailed_status.get("info").get("description")
1382 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1383 # write status to db
1384 result = await self.write_app_status_to_db(
1385 db_dict=db_dict,
1386 status=str(status),
1387 detailed_status=str(detailed_status),
1388 operation=operation,
1389 )
1390 if not result:
1391 self.log.info("Error writing in database. Task exiting...")
1392 return
1393 except asyncio.CancelledError:
1394 self.log.debug("Task cancelled")
1395 return
1396 except Exception as e:
1397 self.log.debug(
1398 "_store_status exception: {}".format(str(e)), exc_info=True
1399 )
1400 pass
1401 finally:
1402 if run_once:
1403 return
1404
1405 # params for use in -f file
1406 # returns values file option and filename (in order to delete it at the end)
1407 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1408
1409 if params and len(params) > 0:
1410 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1411
1412 def get_random_number():
1413 r = random.randrange(start=1, stop=99999999)
1414 s = str(r)
1415 while len(s) < 10:
1416 s = "0" + s
1417 return s
1418
1419 params2 = dict()
1420 for key in params:
1421 value = params.get(key)
1422 if "!!yaml" in str(value):
1423 value = yaml.load(value[7:])
1424 params2[key] = value
1425
1426 values_file = get_random_number() + ".yaml"
1427 with open(values_file, "w") as stream:
1428 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1429
1430 return "-f {}".format(values_file), values_file
1431
1432 return "", None
1433
1434 # params for use in --set option
1435 @staticmethod
1436 def _params_to_set_option(params: dict) -> str:
1437 params_str = ""
1438 if params and len(params) > 0:
1439 start = True
1440 for key in params:
1441 value = params.get(key, None)
1442 if value is not None:
1443 if start:
1444 params_str += "--set "
1445 start = False
1446 else:
1447 params_str += ","
1448 params_str += "{}={}".format(key, value)
1449 return params_str
1450
1451 @staticmethod
1452 def generate_kdu_instance_name(**kwargs):
1453 chart_name = kwargs["kdu_model"]
1454 # check embeded chart (file or dir)
1455 if chart_name.startswith("/"):
1456 # extract file or directory name
1457 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1458 # check URL
1459 elif "://" in chart_name:
1460 # extract last portion of URL
1461 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1462
1463 name = ""
1464 for c in chart_name:
1465 if c.isalpha() or c.isnumeric():
1466 name += c
1467 else:
1468 name += "-"
1469 if len(name) > 35:
1470 name = name[0:35]
1471
1472 # if does not start with alpha character, prefix 'a'
1473 if not name[0].isalpha():
1474 name = "a" + name
1475
1476 name += "-"
1477
1478 def get_random_number():
1479 r = random.randrange(start=1, stop=99999999)
1480 s = str(r)
1481 s = s.rjust(10, "0")
1482 return s
1483
1484 name = name + get_random_number()
1485 return name.lower()