Upgrade to libjuju 2.9.2
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.config import EnvironConfig
35 from n2vc.exceptions import K8sException
36 from n2vc.k8s_conn import K8sConnector
37
38
39 class K8sHelmBaseConnector(K8sConnector):
40
41 """
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
45 """
46
47 service_account = "osm"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 self.config = EnvironConfig()
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 self._stable_repo_url = self.config.get("stablerepourl")
90 if self._stable_repo_url == "None":
91 self._stable_repo_url = None
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(":")
100 return namespace, cluster_id
101
102 async def init_env(
103 self,
104 k8s_creds: str,
105 namespace: str = "kube-system",
106 reuse_cluster_uuid=None,
107 **kwargs,
108 ) -> (str, bool):
109 """
110 It prepares a given K8s cluster environment to run Charts
111
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 '.kube/config'
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
121 """
122
123 if reuse_cluster_uuid:
124 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
125 namespace = namespace_ or namespace
126 else:
127 cluster_id = str(uuid4())
128 cluster_uuid = "{}:{}".format(namespace, cluster_id)
129
130 self.log.debug(
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
132 )
133
134 paths, env = self._init_paths_env(
135 cluster_name=cluster_id, create_if_not_exist=True
136 )
137 mode = stat.S_IRUSR | stat.S_IWUSR
138 with open(paths["kube_config"], "w", mode) as f:
139 f.write(k8s_creds)
140 os.chmod(paths["kube_config"], 0o600)
141
142 # Code with initialization specific of helm version
143 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
144
145 # sync fs with local data
146 self.fs.reverse_sync(from_path=cluster_id)
147
148 self.log.info("Cluster {} initialized".format(cluster_id))
149
150 return cluster_uuid, n2vc_installed_sw
151
152 async def repo_add(
153 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
154 ):
155 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
156 self.log.debug(
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id, repo_type, name, url
159 )
160 )
161
162 # sync local dir
163 self.fs.sync(from_path=cluster_id)
164
165 # init_env
166 paths, env = self._init_paths_env(
167 cluster_name=cluster_id, create_if_not_exist=True
168 )
169
170 # helm repo update
171 command = "{} repo update".format(self._helm_command)
172 self.log.debug("updating repo: {}".format(command))
173 await self._local_async_exec(
174 command=command, raise_exception_on_error=False, env=env
175 )
176
177 # helm repo add name url
178 command = "{} repo add {} {}".format(self._helm_command, name, url)
179 self.log.debug("adding repo: {}".format(command))
180 await self._local_async_exec(
181 command=command, raise_exception_on_error=True, env=env
182 )
183
184 # sync fs
185 self.fs.reverse_sync(from_path=cluster_id)
186
187 async def repo_list(self, cluster_uuid: str) -> list:
188 """
189 Get the list of registered repositories
190
191 :return: list of registered repositories: [ (name, url) .... ]
192 """
193
194 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
195 self.log.debug("list repositories for cluster {}".format(cluster_id))
196
197 # sync local dir
198 self.fs.sync(from_path=cluster_id)
199
200 # config filename
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_id, create_if_not_exist=True
203 )
204
205 command = "{} repo list --output yaml".format(self._helm_command)
206
207 # Set exception to false because if there are no repos just want an empty list
208 output, _rc = await self._local_async_exec(
209 command=command, raise_exception_on_error=False, env=env
210 )
211
212 # sync fs
213 self.fs.reverse_sync(from_path=cluster_id)
214
215 if _rc == 0:
216 if output and len(output) > 0:
217 repos = yaml.load(output, Loader=yaml.SafeLoader)
218 # unify format between helm2 and helm3 setting all keys lowercase
219 return self._lower_keys_list(repos)
220 else:
221 return []
222 else:
223 return []
224
225 async def repo_remove(self, cluster_uuid: str, name: str):
226
227 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
228 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
229
230 # sync local dir
231 self.fs.sync(from_path=cluster_id)
232
233 # init env, paths
234 paths, env = self._init_paths_env(
235 cluster_name=cluster_id, create_if_not_exist=True
236 )
237
238 command = "{} repo remove {}".format(self._helm_command, name)
239 await self._local_async_exec(
240 command=command, raise_exception_on_error=True, env=env
241 )
242
243 # sync fs
244 self.fs.reverse_sync(from_path=cluster_id)
245
246 async def reset(
247 self,
248 cluster_uuid: str,
249 force: bool = False,
250 uninstall_sw: bool = False,
251 **kwargs,
252 ) -> bool:
253 """Reset a cluster
254
255 Resets the Kubernetes cluster by removing the helm deployment that represents it.
256
257 :param cluster_uuid: The UUID of the cluster to reset
258 :param force: Boolean to force the reset
259 :param uninstall_sw: Boolean to force the reset
260 :param kwargs: Additional parameters (None yet)
261 :return: Returns True if successful or raises an exception.
262 """
263 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
264 self.log.debug(
265 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
266 cluster_id, uninstall_sw
267 )
268 )
269
270 # sync local dir
271 self.fs.sync(from_path=cluster_id)
272
273 # uninstall releases if needed.
274 if uninstall_sw:
275 releases = await self.instances_list(cluster_uuid=cluster_uuid)
276 if len(releases) > 0:
277 if force:
278 for r in releases:
279 try:
280 kdu_instance = r.get("name")
281 chart = r.get("chart")
282 self.log.debug(
283 "Uninstalling {} -> {}".format(chart, kdu_instance)
284 )
285 await self.uninstall(
286 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
287 )
288 except Exception as e:
289 # will not raise exception as it was found
290 # that in some cases of previously installed helm releases it
291 # raised an error
292 self.log.warn(
293 "Error uninstalling release {}: {}".format(
294 kdu_instance, e
295 )
296 )
297 else:
298 msg = (
299 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
300 ).format(cluster_id)
301 self.log.warn(msg)
302 uninstall_sw = (
303 False # Allow to remove k8s cluster without removing Tiller
304 )
305
306 if uninstall_sw:
307 await self._uninstall_sw(cluster_id, namespace)
308
309 # delete cluster directory
310 self.log.debug("Removing directory {}".format(cluster_id))
311 self.fs.file_delete(cluster_id, ignore_non_exist=True)
312 # Remove also local directorio if still exist
313 direct = self.fs.path + "/" + cluster_id
314 shutil.rmtree(direct, ignore_errors=True)
315
316 return True
317
318 async def _install_impl(
319 self,
320 cluster_id: str,
321 kdu_model: str,
322 paths: dict,
323 env: dict,
324 kdu_instance: str,
325 atomic: bool = True,
326 timeout: float = 300,
327 params: dict = None,
328 db_dict: dict = None,
329 kdu_name: str = None,
330 namespace: str = None,
331 ):
332 # params to str
333 params_str, file_to_delete = self._params_to_file_option(
334 cluster_id=cluster_id, params=params
335 )
336
337 # version
338 version = None
339 if ":" in kdu_model:
340 parts = kdu_model.split(sep=":")
341 if len(parts) == 2:
342 version = str(parts[1])
343 kdu_model = parts[0]
344
345 command = self._get_install_command(
346 kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
347 )
348
349 self.log.debug("installing: {}".format(command))
350
351 if atomic:
352 # exec helm in a task
353 exec_task = asyncio.ensure_future(
354 coro_or_future=self._local_async_exec(
355 command=command, raise_exception_on_error=False, env=env
356 )
357 )
358
359 # write status in another task
360 status_task = asyncio.ensure_future(
361 coro_or_future=self._store_status(
362 cluster_id=cluster_id,
363 kdu_instance=kdu_instance,
364 namespace=namespace,
365 db_dict=db_dict,
366 operation="install",
367 run_once=False,
368 )
369 )
370
371 # wait for execution task
372 await asyncio.wait([exec_task])
373
374 # cancel status task
375 status_task.cancel()
376
377 output, rc = exec_task.result()
378
379 else:
380
381 output, rc = await self._local_async_exec(
382 command=command, raise_exception_on_error=False, env=env
383 )
384
385 # remove temporal values yaml file
386 if file_to_delete:
387 os.remove(file_to_delete)
388
389 # write final status
390 await self._store_status(
391 cluster_id=cluster_id,
392 kdu_instance=kdu_instance,
393 namespace=namespace,
394 db_dict=db_dict,
395 operation="install",
396 run_once=True,
397 check_every=0,
398 )
399
400 if rc != 0:
401 msg = "Error executing command: {}\nOutput: {}".format(command, output)
402 self.log.error(msg)
403 raise K8sException(msg)
404
405 async def upgrade(
406 self,
407 cluster_uuid: str,
408 kdu_instance: str,
409 kdu_model: str = None,
410 atomic: bool = True,
411 timeout: float = 300,
412 params: dict = None,
413 db_dict: dict = None,
414 ):
415 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
416 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
417
418 # sync local dir
419 self.fs.sync(from_path=cluster_id)
420
421 # look for instance to obtain namespace
422 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
423 if not instance_info:
424 raise K8sException("kdu_instance {} not found".format(kdu_instance))
425
426 # init env, paths
427 paths, env = self._init_paths_env(
428 cluster_name=cluster_id, create_if_not_exist=True
429 )
430
431 # params to str
432 params_str, file_to_delete = self._params_to_file_option(
433 cluster_id=cluster_id, params=params
434 )
435
436 # version
437 version = None
438 if ":" in kdu_model:
439 parts = kdu_model.split(sep=":")
440 if len(parts) == 2:
441 version = str(parts[1])
442 kdu_model = parts[0]
443
444 command = self._get_upgrade_command(
445 kdu_model,
446 kdu_instance,
447 instance_info["namespace"],
448 params_str,
449 version,
450 atomic,
451 timeout,
452 )
453
454 self.log.debug("upgrading: {}".format(command))
455
456 if atomic:
457
458 # exec helm in a task
459 exec_task = asyncio.ensure_future(
460 coro_or_future=self._local_async_exec(
461 command=command, raise_exception_on_error=False, env=env
462 )
463 )
464 # write status in another task
465 status_task = asyncio.ensure_future(
466 coro_or_future=self._store_status(
467 cluster_id=cluster_id,
468 kdu_instance=kdu_instance,
469 namespace=instance_info["namespace"],
470 db_dict=db_dict,
471 operation="upgrade",
472 run_once=False,
473 )
474 )
475
476 # wait for execution task
477 await asyncio.wait([exec_task])
478
479 # cancel status task
480 status_task.cancel()
481 output, rc = exec_task.result()
482
483 else:
484
485 output, rc = await self._local_async_exec(
486 command=command, raise_exception_on_error=False, env=env
487 )
488
489 # remove temporal values yaml file
490 if file_to_delete:
491 os.remove(file_to_delete)
492
493 # write final status
494 await self._store_status(
495 cluster_id=cluster_id,
496 kdu_instance=kdu_instance,
497 namespace=instance_info["namespace"],
498 db_dict=db_dict,
499 operation="upgrade",
500 run_once=True,
501 check_every=0,
502 )
503
504 if rc != 0:
505 msg = "Error executing command: {}\nOutput: {}".format(command, output)
506 self.log.error(msg)
507 raise K8sException(msg)
508
509 # sync fs
510 self.fs.reverse_sync(from_path=cluster_id)
511
512 # return new revision number
513 instance = await self.get_instance_info(
514 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
515 )
516 if instance:
517 revision = int(instance.get("revision"))
518 self.log.debug("New revision: {}".format(revision))
519 return revision
520 else:
521 return 0
522
523 async def scale(
524 self,
525 kdu_instance: str,
526 scale: int,
527 resource_name: str,
528 total_timeout: float = 1800,
529 **kwargs,
530 ):
531 raise NotImplementedError("Method not implemented")
532
533 async def get_scale_count(
534 self,
535 resource_name: str,
536 kdu_instance: str,
537 **kwargs,
538 ):
539 raise NotImplementedError("Method not implemented")
540
541 async def rollback(
542 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
543 ):
544
545 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
546 self.log.debug(
547 "rollback kdu_instance {} to revision {} from cluster {}".format(
548 kdu_instance, revision, cluster_id
549 )
550 )
551
552 # sync local dir
553 self.fs.sync(from_path=cluster_id)
554
555 # look for instance to obtain namespace
556 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
557 if not instance_info:
558 raise K8sException("kdu_instance {} not found".format(kdu_instance))
559
560 # init env, paths
561 paths, env = self._init_paths_env(
562 cluster_name=cluster_id, create_if_not_exist=True
563 )
564
565 command = self._get_rollback_command(
566 kdu_instance, instance_info["namespace"], revision
567 )
568
569 self.log.debug("rolling_back: {}".format(command))
570
571 # exec helm in a task
572 exec_task = asyncio.ensure_future(
573 coro_or_future=self._local_async_exec(
574 command=command, raise_exception_on_error=False, env=env
575 )
576 )
577 # write status in another task
578 status_task = asyncio.ensure_future(
579 coro_or_future=self._store_status(
580 cluster_id=cluster_id,
581 kdu_instance=kdu_instance,
582 namespace=instance_info["namespace"],
583 db_dict=db_dict,
584 operation="rollback",
585 run_once=False,
586 )
587 )
588
589 # wait for execution task
590 await asyncio.wait([exec_task])
591
592 # cancel status task
593 status_task.cancel()
594
595 output, rc = exec_task.result()
596
597 # write final status
598 await self._store_status(
599 cluster_id=cluster_id,
600 kdu_instance=kdu_instance,
601 namespace=instance_info["namespace"],
602 db_dict=db_dict,
603 operation="rollback",
604 run_once=True,
605 check_every=0,
606 )
607
608 if rc != 0:
609 msg = "Error executing command: {}\nOutput: {}".format(command, output)
610 self.log.error(msg)
611 raise K8sException(msg)
612
613 # sync fs
614 self.fs.reverse_sync(from_path=cluster_id)
615
616 # return new revision number
617 instance = await self.get_instance_info(
618 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
619 )
620 if instance:
621 revision = int(instance.get("revision"))
622 self.log.debug("New revision: {}".format(revision))
623 return revision
624 else:
625 return 0
626
627 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
628 """
629 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
630 (this call should happen after all _terminate-config-primitive_ of the VNF
631 are invoked).
632
633 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
634 :param kdu_instance: unique name for the KDU instance to be deleted
635 :param kwargs: Additional parameters (None yet)
636 :return: True if successful
637 """
638
639 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
640 self.log.debug(
641 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
642 )
643
644 # sync local dir
645 self.fs.sync(from_path=cluster_id)
646
647 # look for instance to obtain namespace
648 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
649 if not instance_info:
650 raise K8sException("kdu_instance {} not found".format(kdu_instance))
651
652 # init env, paths
653 paths, env = self._init_paths_env(
654 cluster_name=cluster_id, create_if_not_exist=True
655 )
656
657 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
658 output, _rc = await self._local_async_exec(
659 command=command, raise_exception_on_error=True, env=env
660 )
661
662 # sync fs
663 self.fs.reverse_sync(from_path=cluster_id)
664
665 return self._output_to_table(output)
666
667 async def instances_list(self, cluster_uuid: str) -> list:
668 """
669 returns a list of deployed releases in a cluster
670
671 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
672 :return:
673 """
674
675 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
676 self.log.debug("list releases for cluster {}".format(cluster_id))
677
678 # sync local dir
679 self.fs.sync(from_path=cluster_id)
680
681 # execute internal command
682 result = await self._instances_list(cluster_id)
683
684 # sync fs
685 self.fs.reverse_sync(from_path=cluster_id)
686
687 return result
688
689 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
690 instances = await self.instances_list(cluster_uuid=cluster_uuid)
691 for instance in instances:
692 if instance.get("name") == kdu_instance:
693 return instance
694 self.log.debug("Instance {} not found".format(kdu_instance))
695 return None
696
697 async def exec_primitive(
698 self,
699 cluster_uuid: str = None,
700 kdu_instance: str = None,
701 primitive_name: str = None,
702 timeout: float = 300,
703 params: dict = None,
704 db_dict: dict = None,
705 **kwargs,
706 ) -> str:
707 """Exec primitive (Juju action)
708
709 :param cluster_uuid: The UUID of the cluster or namespace:cluster
710 :param kdu_instance: The unique name of the KDU instance
711 :param primitive_name: Name of action that will be executed
712 :param timeout: Timeout for action execution
713 :param params: Dictionary of all the parameters needed for the action
714 :db_dict: Dictionary for any additional data
715 :param kwargs: Additional parameters (None yet)
716
717 :return: Returns the output of the action
718 """
719 raise K8sException(
720 "KDUs deployed with Helm don't support actions "
721 "different from rollback, upgrade and status"
722 )
723
724 async def get_services(
725 self, cluster_uuid: str, kdu_instance: str, namespace: str
726 ) -> list:
727 """
728 Returns a list of services defined for the specified kdu instance.
729
730 :param cluster_uuid: UUID of a K8s cluster known by OSM
731 :param kdu_instance: unique name for the KDU instance
732 :param namespace: K8s namespace used by the KDU instance
733 :return: If successful, it will return a list of services, Each service
734 can have the following data:
735 - `name` of the service
736 - `type` type of service in the k8 cluster
737 - `ports` List of ports offered by the service, for each port includes at least
738 name, port, protocol
739 - `cluster_ip` Internal ip to be used inside k8s cluster
740 - `external_ip` List of external ips (in case they are available)
741 """
742
743 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
744 self.log.debug(
745 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
746 cluster_uuid, kdu_instance
747 )
748 )
749
750 # sync local dir
751 self.fs.sync(from_path=cluster_id)
752
753 # get list of services names for kdu
754 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
755
756 service_list = []
757 for service in service_names:
758 service = await self._get_service(cluster_id, service, namespace)
759 service_list.append(service)
760
761 # sync fs
762 self.fs.reverse_sync(from_path=cluster_id)
763
764 return service_list
765
766 async def get_service(
767 self, cluster_uuid: str, service_name: str, namespace: str
768 ) -> object:
769
770 self.log.debug(
771 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
772 service_name, namespace, cluster_uuid
773 )
774 )
775
776 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
777
778 # sync local dir
779 self.fs.sync(from_path=cluster_id)
780
781 service = await self._get_service(cluster_id, service_name, namespace)
782
783 # sync fs
784 self.fs.reverse_sync(from_path=cluster_id)
785
786 return service
787
788 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
789 """
790 This call would retrieve tha current state of a given KDU instance. It would be
791 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
792 values_ of the configuration parameters applied to a given instance. This call
793 would be based on the `status` call.
794
795 :param cluster_uuid: UUID of a K8s cluster known by OSM
796 :param kdu_instance: unique name for the KDU instance
797 :param kwargs: Additional parameters (None yet)
798 :return: If successful, it will return the following vector of arguments:
799 - K8s `namespace` in the cluster where the KDU lives
800 - `state` of the KDU instance. It can be:
801 - UNKNOWN
802 - DEPLOYED
803 - DELETED
804 - SUPERSEDED
805 - FAILED or
806 - DELETING
807 - List of `resources` (objects) that this release consists of, sorted by kind,
808 and the status of those resources
809 - Last `deployment_time`.
810
811 """
812 self.log.debug(
813 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
814 cluster_uuid, kdu_instance
815 )
816 )
817
818 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
819
820 # sync local dir
821 self.fs.sync(from_path=cluster_id)
822
823 # get instance: needed to obtain namespace
824 instances = await self._instances_list(cluster_id=cluster_id)
825 for instance in instances:
826 if instance.get("name") == kdu_instance:
827 break
828 else:
829 # instance does not exist
830 raise K8sException(
831 "Instance name: {} not found in cluster: {}".format(
832 kdu_instance, cluster_id
833 )
834 )
835
836 status = await self._status_kdu(
837 cluster_id=cluster_id,
838 kdu_instance=kdu_instance,
839 namespace=instance["namespace"],
840 show_error_log=True,
841 return_text=True,
842 )
843
844 # sync fs
845 self.fs.reverse_sync(from_path=cluster_id)
846
847 return status
848
849 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
850
851 self.log.debug(
852 "inspect kdu_model values {} from (optional) repo: {}".format(
853 kdu_model, repo_url
854 )
855 )
856
857 return await self._exec_inspect_comand(
858 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
859 )
860
861 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
862
863 self.log.debug(
864 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
865 )
866
867 return await self._exec_inspect_comand(
868 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
869 )
870
871 async def synchronize_repos(self, cluster_uuid: str):
872
873 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
874 try:
875 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
876 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
877
878 local_repo_list = await self.repo_list(cluster_uuid)
879 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
880
881 deleted_repo_list = []
882 added_repo_dict = {}
883
884 # iterate over the list of repos in the database that should be
885 # added if not present
886 for repo_name, db_repo in db_repo_dict.items():
887 try:
888 # check if it is already present
889 curr_repo_url = local_repo_dict.get(db_repo["name"])
890 repo_id = db_repo.get("_id")
891 if curr_repo_url != db_repo["url"]:
892 if curr_repo_url:
893 self.log.debug(
894 "repo {} url changed, delete and and again".format(
895 db_repo["url"]
896 )
897 )
898 await self.repo_remove(cluster_uuid, db_repo["name"])
899 deleted_repo_list.append(repo_id)
900
901 # add repo
902 self.log.debug("add repo {}".format(db_repo["name"]))
903 await self.repo_add(
904 cluster_uuid, db_repo["name"], db_repo["url"]
905 )
906 added_repo_dict[repo_id] = db_repo["name"]
907 except Exception as e:
908 raise K8sException(
909 "Error adding repo id: {}, err_msg: {} ".format(
910 repo_id, repr(e)
911 )
912 )
913
914 # Delete repos that are present but not in nbi_list
915 for repo_name in local_repo_dict:
916 if not db_repo_dict.get(repo_name) and repo_name != "stable":
917 self.log.debug("delete repo {}".format(repo_name))
918 try:
919 await self.repo_remove(cluster_uuid, repo_name)
920 deleted_repo_list.append(repo_name)
921 except Exception as e:
922 self.warning(
923 "Error deleting repo, name: {}, err_msg: {}".format(
924 repo_name, str(e)
925 )
926 )
927
928 return deleted_repo_list, added_repo_dict
929
930 except K8sException:
931 raise
932 except Exception as e:
933 # Do not raise errors synchronizing repos
934 self.log.error("Error synchronizing repos: {}".format(e))
935 raise Exception("Error synchronizing repos: {}".format(e))
936
937 def _get_db_repos_dict(self, repo_ids: list):
938 db_repos_dict = {}
939 for repo_id in repo_ids:
940 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
941 db_repos_dict[db_repo["name"]] = db_repo
942 return db_repos_dict
943
944 """
945 ####################################################################################
946 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
947 ####################################################################################
948 """
949
950 @abc.abstractmethod
951 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
952 """
953 Creates and returns base cluster and kube dirs and returns them.
954 Also created helm3 dirs according to new directory specification, paths are
955 not returned but assigned to helm environment variables
956
957 :param cluster_name: cluster_name
958 :return: Dictionary with config_paths and dictionary with helm environment variables
959 """
960
961 @abc.abstractmethod
962 async def _cluster_init(self, cluster_id, namespace, paths, env):
963 """
964 Implements the helm version dependent cluster initialization
965 """
966
967 @abc.abstractmethod
968 async def _instances_list(self, cluster_id):
969 """
970 Implements the helm version dependent helm instances list
971 """
972
973 @abc.abstractmethod
974 async def _get_services(self, cluster_id, kdu_instance, namespace):
975 """
976 Implements the helm version dependent method to obtain services from a helm instance
977 """
978
979 @abc.abstractmethod
980 async def _status_kdu(
981 self,
982 cluster_id: str,
983 kdu_instance: str,
984 namespace: str = None,
985 show_error_log: bool = False,
986 return_text: bool = False,
987 ):
988 """
989 Implements the helm version dependent method to obtain status of a helm instance
990 """
991
992 @abc.abstractmethod
993 def _get_install_command(
994 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
995 ) -> str:
996 """
997 Obtain command to be executed to delete the indicated instance
998 """
999
1000 @abc.abstractmethod
1001 def _get_upgrade_command(
1002 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
1003 ) -> str:
1004 """
1005 Obtain command to be executed to upgrade the indicated instance
1006 """
1007
1008 @abc.abstractmethod
1009 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
1010 """
1011 Obtain command to be executed to rollback the indicated instance
1012 """
1013
1014 @abc.abstractmethod
1015 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
1016 """
1017 Obtain command to be executed to delete the indicated instance
1018 """
1019
1020 @abc.abstractmethod
1021 def _get_inspect_command(
1022 self, show_command: str, kdu_model: str, repo_str: str, version: str
1023 ):
1024 """
1025 Obtain command to be executed to obtain information about the kdu
1026 """
1027
1028 @abc.abstractmethod
1029 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1030 """
1031 Method call to uninstall cluster software for helm. This method is dependent
1032 of helm version
1033 For Helm v2 it will be called when Tiller must be uninstalled
1034 For Helm v3 it does nothing and does not need to be callled
1035 """
1036
1037 @abc.abstractmethod
1038 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1039 """
1040 Obtains the cluster repos identifiers
1041 """
1042
1043 """
1044 ####################################################################################
1045 ################################### P R I V A T E ##################################
1046 ####################################################################################
1047 """
1048
1049 @staticmethod
1050 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1051 if os.path.exists(filename):
1052 return True
1053 else:
1054 msg = "File {} does not exist".format(filename)
1055 if exception_if_not_exists:
1056 raise K8sException(msg)
1057
1058 @staticmethod
1059 def _remove_multiple_spaces(strobj):
1060 strobj = strobj.strip()
1061 while " " in strobj:
1062 strobj = strobj.replace(" ", " ")
1063 return strobj
1064
1065 @staticmethod
1066 def _output_to_lines(output: str) -> list:
1067 output_lines = list()
1068 lines = output.splitlines(keepends=False)
1069 for line in lines:
1070 line = line.strip()
1071 if len(line) > 0:
1072 output_lines.append(line)
1073 return output_lines
1074
1075 @staticmethod
1076 def _output_to_table(output: str) -> list:
1077 output_table = list()
1078 lines = output.splitlines(keepends=False)
1079 for line in lines:
1080 line = line.replace("\t", " ")
1081 line_list = list()
1082 output_table.append(line_list)
1083 cells = line.split(sep=" ")
1084 for cell in cells:
1085 cell = cell.strip()
1086 if len(cell) > 0:
1087 line_list.append(cell)
1088 return output_table
1089
1090 @staticmethod
1091 def _parse_services(output: str) -> list:
1092 lines = output.splitlines(keepends=False)
1093 services = []
1094 for line in lines:
1095 line = line.replace("\t", " ")
1096 cells = line.split(sep=" ")
1097 if len(cells) > 0 and cells[0].startswith("service/"):
1098 elems = cells[0].split(sep="/")
1099 if len(elems) > 1:
1100 services.append(elems[1])
1101 return services
1102
1103 @staticmethod
1104 def _get_deep(dictionary: dict, members: tuple):
1105 target = dictionary
1106 value = None
1107 try:
1108 for m in members:
1109 value = target.get(m)
1110 if not value:
1111 return None
1112 else:
1113 target = value
1114 except Exception:
1115 pass
1116 return value
1117
1118 # find key:value in several lines
1119 @staticmethod
1120 def _find_in_lines(p_lines: list, p_key: str) -> str:
1121 for line in p_lines:
1122 try:
1123 if line.startswith(p_key + ":"):
1124 parts = line.split(":")
1125 the_value = parts[1].strip()
1126 return the_value
1127 except Exception:
1128 # ignore it
1129 pass
1130 return None
1131
1132 @staticmethod
1133 def _lower_keys_list(input_list: list):
1134 """
1135 Transform the keys in a list of dictionaries to lower case and returns a new list
1136 of dictionaries
1137 """
1138 new_list = []
1139 if input_list:
1140 for dictionary in input_list:
1141 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1142 new_list.append(new_dict)
1143 return new_list
1144
1145 def _local_exec(self, command: str) -> (str, int):
1146 command = self._remove_multiple_spaces(command)
1147 self.log.debug("Executing sync local command: {}".format(command))
1148 # raise exception if fails
1149 output = ""
1150 try:
1151 output = subprocess.check_output(
1152 command, shell=True, universal_newlines=True
1153 )
1154 return_code = 0
1155 self.log.debug(output)
1156 except Exception:
1157 return_code = 1
1158
1159 return output, return_code
1160
1161 async def _local_async_exec(
1162 self,
1163 command: str,
1164 raise_exception_on_error: bool = False,
1165 show_error_log: bool = True,
1166 encode_utf8: bool = False,
1167 env: dict = None,
1168 ) -> (str, int):
1169
1170 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1171 self.log.debug(
1172 "Executing async local command: {}, env: {}".format(command, env)
1173 )
1174
1175 # split command
1176 command = shlex.split(command)
1177
1178 environ = os.environ.copy()
1179 if env:
1180 environ.update(env)
1181
1182 try:
1183 process = await asyncio.create_subprocess_exec(
1184 *command,
1185 stdout=asyncio.subprocess.PIPE,
1186 stderr=asyncio.subprocess.PIPE,
1187 env=environ,
1188 )
1189
1190 # wait for command terminate
1191 stdout, stderr = await process.communicate()
1192
1193 return_code = process.returncode
1194
1195 output = ""
1196 if stdout:
1197 output = stdout.decode("utf-8").strip()
1198 # output = stdout.decode()
1199 if stderr:
1200 output = stderr.decode("utf-8").strip()
1201 # output = stderr.decode()
1202
1203 if return_code != 0 and show_error_log:
1204 self.log.debug(
1205 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1206 )
1207 else:
1208 self.log.debug("Return code: {}".format(return_code))
1209
1210 if raise_exception_on_error and return_code != 0:
1211 raise K8sException(output)
1212
1213 if encode_utf8:
1214 output = output.encode("utf-8").strip()
1215 output = str(output).replace("\\n", "\n")
1216
1217 return output, return_code
1218
1219 except asyncio.CancelledError:
1220 raise
1221 except K8sException:
1222 raise
1223 except Exception as e:
1224 msg = "Exception executing command: {} -> {}".format(command, e)
1225 self.log.error(msg)
1226 if raise_exception_on_error:
1227 raise K8sException(e) from e
1228 else:
1229 return "", -1
1230
1231 async def _local_async_exec_pipe(
1232 self,
1233 command1: str,
1234 command2: str,
1235 raise_exception_on_error: bool = True,
1236 show_error_log: bool = True,
1237 encode_utf8: bool = False,
1238 env: dict = None,
1239 ):
1240
1241 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1242 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1243 command = "{} | {}".format(command1, command2)
1244 self.log.debug(
1245 "Executing async local command: {}, env: {}".format(command, env)
1246 )
1247
1248 # split command
1249 command1 = shlex.split(command1)
1250 command2 = shlex.split(command2)
1251
1252 environ = os.environ.copy()
1253 if env:
1254 environ.update(env)
1255
1256 try:
1257 read, write = os.pipe()
1258 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1259 os.close(write)
1260 process_2 = await asyncio.create_subprocess_exec(
1261 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1262 )
1263 os.close(read)
1264 stdout, stderr = await process_2.communicate()
1265
1266 return_code = process_2.returncode
1267
1268 output = ""
1269 if stdout:
1270 output = stdout.decode("utf-8").strip()
1271 # output = stdout.decode()
1272 if stderr:
1273 output = stderr.decode("utf-8").strip()
1274 # output = stderr.decode()
1275
1276 if return_code != 0 and show_error_log:
1277 self.log.debug(
1278 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1279 )
1280 else:
1281 self.log.debug("Return code: {}".format(return_code))
1282
1283 if raise_exception_on_error and return_code != 0:
1284 raise K8sException(output)
1285
1286 if encode_utf8:
1287 output = output.encode("utf-8").strip()
1288 output = str(output).replace("\\n", "\n")
1289
1290 return output, return_code
1291 except asyncio.CancelledError:
1292 raise
1293 except K8sException:
1294 raise
1295 except Exception as e:
1296 msg = "Exception executing command: {} -> {}".format(command, e)
1297 self.log.error(msg)
1298 if raise_exception_on_error:
1299 raise K8sException(e) from e
1300 else:
1301 return "", -1
1302
1303 async def _get_service(self, cluster_id, service_name, namespace):
1304 """
1305 Obtains the data of the specified service in the k8cluster.
1306
1307 :param cluster_id: id of a K8s cluster known by OSM
1308 :param service_name: name of the K8s service in the specified namespace
1309 :param namespace: K8s namespace used by the KDU instance
1310 :return: If successful, it will return a service with the following data:
1311 - `name` of the service
1312 - `type` type of service in the k8 cluster
1313 - `ports` List of ports offered by the service, for each port includes at least
1314 name, port, protocol
1315 - `cluster_ip` Internal ip to be used inside k8s cluster
1316 - `external_ip` List of external ips (in case they are available)
1317 """
1318
1319 # init config, env
1320 paths, env = self._init_paths_env(
1321 cluster_name=cluster_id, create_if_not_exist=True
1322 )
1323
1324 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1325 self.kubectl_command, paths["kube_config"], namespace, service_name
1326 )
1327
1328 output, _rc = await self._local_async_exec(
1329 command=command, raise_exception_on_error=True, env=env
1330 )
1331
1332 data = yaml.load(output, Loader=yaml.SafeLoader)
1333
1334 service = {
1335 "name": service_name,
1336 "type": self._get_deep(data, ("spec", "type")),
1337 "ports": self._get_deep(data, ("spec", "ports")),
1338 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1339 }
1340 if service["type"] == "LoadBalancer":
1341 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1342 ip_list = [elem["ip"] for elem in ip_map_list]
1343 service["external_ip"] = ip_list
1344
1345 return service
1346
1347 async def _exec_inspect_comand(
1348 self, inspect_command: str, kdu_model: str, repo_url: str = None
1349 ):
1350 """
1351 Obtains information about a kdu, no cluster (no env)
1352 """
1353
1354 repo_str = ""
1355 if repo_url:
1356 repo_str = " --repo {}".format(repo_url)
1357
1358 idx = kdu_model.find("/")
1359 if idx >= 0:
1360 idx += 1
1361 kdu_model = kdu_model[idx:]
1362
1363 version = ""
1364 if ":" in kdu_model:
1365 parts = kdu_model.split(sep=":")
1366 if len(parts) == 2:
1367 version = "--version {}".format(str(parts[1]))
1368 kdu_model = parts[0]
1369
1370 full_command = self._get_inspect_command(
1371 inspect_command, kdu_model, repo_str, version
1372 )
1373 output, _rc = await self._local_async_exec(
1374 command=full_command, encode_utf8=True
1375 )
1376
1377 return output
1378
1379 async def _store_status(
1380 self,
1381 cluster_id: str,
1382 operation: str,
1383 kdu_instance: str,
1384 namespace: str = None,
1385 check_every: float = 10,
1386 db_dict: dict = None,
1387 run_once: bool = False,
1388 ):
1389 while True:
1390 try:
1391 await asyncio.sleep(check_every)
1392 detailed_status = await self._status_kdu(
1393 cluster_id=cluster_id,
1394 kdu_instance=kdu_instance,
1395 namespace=namespace,
1396 return_text=False,
1397 )
1398 status = detailed_status.get("info").get("description")
1399 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1400 # write status to db
1401 result = await self.write_app_status_to_db(
1402 db_dict=db_dict,
1403 status=str(status),
1404 detailed_status=str(detailed_status),
1405 operation=operation,
1406 )
1407 if not result:
1408 self.log.info("Error writing in database. Task exiting...")
1409 return
1410 except asyncio.CancelledError:
1411 self.log.debug("Task cancelled")
1412 return
1413 except Exception as e:
1414 self.log.debug(
1415 "_store_status exception: {}".format(str(e)), exc_info=True
1416 )
1417 pass
1418 finally:
1419 if run_once:
1420 return
1421
1422 # params for use in -f file
1423 # returns values file option and filename (in order to delete it at the end)
1424 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1425
1426 if params and len(params) > 0:
1427 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1428
1429 def get_random_number():
1430 r = random.randrange(start=1, stop=99999999)
1431 s = str(r)
1432 while len(s) < 10:
1433 s = "0" + s
1434 return s
1435
1436 params2 = dict()
1437 for key in params:
1438 value = params.get(key)
1439 if "!!yaml" in str(value):
1440 value = yaml.load(value[7:])
1441 params2[key] = value
1442
1443 values_file = get_random_number() + ".yaml"
1444 with open(values_file, "w") as stream:
1445 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1446
1447 return "-f {}".format(values_file), values_file
1448
1449 return "", None
1450
1451 # params for use in --set option
1452 @staticmethod
1453 def _params_to_set_option(params: dict) -> str:
1454 params_str = ""
1455 if params and len(params) > 0:
1456 start = True
1457 for key in params:
1458 value = params.get(key, None)
1459 if value is not None:
1460 if start:
1461 params_str += "--set "
1462 start = False
1463 else:
1464 params_str += ","
1465 params_str += "{}={}".format(key, value)
1466 return params_str
1467
1468 @staticmethod
1469 def generate_kdu_instance_name(**kwargs):
1470 chart_name = kwargs["kdu_model"]
1471 # check embeded chart (file or dir)
1472 if chart_name.startswith("/"):
1473 # extract file or directory name
1474 chart_name = chart_name[chart_name.rfind("/") + 1:]
1475 # check URL
1476 elif "://" in chart_name:
1477 # extract last portion of URL
1478 chart_name = chart_name[chart_name.rfind("/") + 1:]
1479
1480 name = ""
1481 for c in chart_name:
1482 if c.isalpha() or c.isnumeric():
1483 name += c
1484 else:
1485 name += "-"
1486 if len(name) > 35:
1487 name = name[0:35]
1488
1489 # if does not start with alpha character, prefix 'a'
1490 if not name[0].isalpha():
1491 name = "a" + name
1492
1493 name += "-"
1494
1495 def get_random_number():
1496 r = random.randrange(start=1, stop=99999999)
1497 s = str(r)
1498 s = s.rjust(10, "0")
1499 return s
1500
1501 name = name + get_random_number()
1502 return name.lower()