Corrected bugs on support for helm v3
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 helm_command: str = "/usr/bin/helm",
53 log: object = None,
54 on_update_db=None,
55 ):
56 """
57
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param on_update_db: callback called when k8s connector updates database
64 """
65
66 # parent class
67 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
68
69 self.log.info("Initializing K8S Helm connector")
70
71 # random numbers for release name generation
72 random.seed(time.time())
73
74 # the file system
75 self.fs = fs
76
77 # exception if kubectl is not installed
78 self.kubectl_command = kubectl_command
79 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
80
81 # exception if helm is not installed
82 self._helm_command = helm_command
83 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
84
85 @staticmethod
86 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
87 """
88 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
89 cluster_id for backward compatibility
90 """
91 namespace, _, cluster_id = cluster_uuid.rpartition(':')
92 return namespace, cluster_id
93
94 async def init_env(
95 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
96 ) -> (str, bool):
97 """
98 It prepares a given K8s cluster environment to run Charts
99
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
101 '.kube/config'
102 :param namespace: optional namespace to be used for helm. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :return: uuid of the K8s cluster and True if connector has installed some
106 software in the cluster
107 (on error, an exception will be raised)
108 """
109
110 if reuse_cluster_uuid:
111 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
112 namespace = namespace_ or namespace
113 else:
114 cluster_id = str(uuid4())
115 cluster_uuid = "{}:{}".format(namespace, cluster_id)
116
117 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
118
119 paths, env = self._init_paths_env(
120 cluster_name=cluster_id, create_if_not_exist=True
121 )
122 mode = stat.S_IRUSR | stat.S_IWUSR
123 with open(paths["kube_config"], "w", mode) as f:
124 f.write(k8s_creds)
125 os.chmod(paths["kube_config"], 0o600)
126
127 # Code with initialization specific of helm version
128 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
129
130 # sync fs with local data
131 self.fs.reverse_sync(from_path=cluster_id)
132
133 self.log.info("Cluster {} initialized".format(cluster_id))
134
135 return cluster_uuid, n2vc_installed_sw
136
137 async def repo_add(
138 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
139 ):
140 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
141 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
142 cluster_id, repo_type, name, url))
143
144 # sync local dir
145 self.fs.sync(from_path=cluster_id)
146
147 # init_env
148 paths, env = self._init_paths_env(
149 cluster_name=cluster_id, create_if_not_exist=True
150 )
151
152 # helm repo update
153 command = "{} repo update".format(
154 self._helm_command
155 )
156 self.log.debug("updating repo: {}".format(command))
157 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
158
159 # helm repo add name url
160 command = "{} repo add {} {}".format(
161 self._helm_command, name, url
162 )
163 self.log.debug("adding repo: {}".format(command))
164 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
165
166 # sync fs
167 self.fs.reverse_sync(from_path=cluster_id)
168
169 async def repo_list(self, cluster_uuid: str) -> list:
170 """
171 Get the list of registered repositories
172
173 :return: list of registered repositories: [ (name, url) .... ]
174 """
175
176 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
177 self.log.debug("list repositories for cluster {}".format(cluster_id))
178
179 # sync local dir
180 self.fs.sync(from_path=cluster_id)
181
182 # config filename
183 paths, env = self._init_paths_env(
184 cluster_name=cluster_id, create_if_not_exist=True
185 )
186
187 command = "{} repo list --output yaml".format(
188 self._helm_command
189 )
190
191 # Set exception to false because if there are no repos just want an empty list
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=False, env=env
194 )
195
196 # sync fs
197 self.fs.reverse_sync(from_path=cluster_id)
198
199 if _rc == 0:
200 if output and len(output) > 0:
201 repos = yaml.load(output, Loader=yaml.SafeLoader)
202 # unify format between helm2 and helm3 setting all keys lowercase
203 return self._lower_keys_list(repos)
204 else:
205 return []
206 else:
207 return []
208
209 async def repo_remove(self, cluster_uuid: str, name: str):
210
211 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
212 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
213
214 # sync local dir
215 self.fs.sync(from_path=cluster_id)
216
217 # init env, paths
218 paths, env = self._init_paths_env(
219 cluster_name=cluster_id, create_if_not_exist=True
220 )
221
222 command = "{} repo remove {}".format(
223 self._helm_command, name
224 )
225 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
226
227 # sync fs
228 self.fs.reverse_sync(from_path=cluster_id)
229
230 async def reset(
231 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
232 ) -> bool:
233
234 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
235 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
236 .format(cluster_id, uninstall_sw))
237
238 # sync local dir
239 self.fs.sync(from_path=cluster_id)
240
241 # uninstall releases if needed.
242 if uninstall_sw:
243 releases = await self.instances_list(cluster_uuid=cluster_uuid)
244 if len(releases) > 0:
245 if force:
246 for r in releases:
247 try:
248 kdu_instance = r.get("name")
249 chart = r.get("chart")
250 self.log.debug(
251 "Uninstalling {} -> {}".format(chart, kdu_instance)
252 )
253 await self.uninstall(
254 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
255 )
256 except Exception as e:
257 # will not raise exception as it was found
258 # that in some cases of previously installed helm releases it
259 # raised an error
260 self.log.warn(
261 "Error uninstalling release {}: {}".format(kdu_instance, e)
262 )
263 else:
264 msg = (
265 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
266 ).format(cluster_id)
267 self.log.warn(msg)
268 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
269
270 if uninstall_sw:
271 await self._uninstall_sw(cluster_id, namespace)
272
273 # delete cluster directory
274 self.log.debug("Removing directory {}".format(cluster_id))
275 self.fs.file_delete(cluster_id, ignore_non_exist=True)
276 # Remove also local directorio if still exist
277 direct = self.fs.path + "/" + cluster_id
278 shutil.rmtree(direct, ignore_errors=True)
279
280 return True
281
282 async def _install_impl(
283 self,
284 cluster_id: str,
285 kdu_model: str,
286 paths: dict,
287 env: dict,
288 atomic: bool = True,
289 timeout: float = 300,
290 params: dict = None,
291 db_dict: dict = None,
292 kdu_name: str = None,
293 namespace: str = None,
294 ):
295 # params to str
296 params_str, file_to_delete = self._params_to_file_option(
297 cluster_id=cluster_id, params=params
298 )
299
300 # version
301 version = None
302 if ":" in kdu_model:
303 parts = kdu_model.split(sep=":")
304 if len(parts) == 2:
305 version = str(parts[1])
306 kdu_model = parts[0]
307
308 # generate a name for the release. Then, check if already exists
309 kdu_instance = None
310 while kdu_instance is None:
311 kdu_instance = self._generate_release_name(kdu_model)
312 try:
313 result = await self._status_kdu(
314 cluster_id=cluster_id,
315 kdu_instance=kdu_instance,
316 namespace=namespace,
317 show_error_log=False,
318 )
319 if result is not None:
320 # instance already exists: generate a new one
321 kdu_instance = None
322 except K8sException:
323 pass
324
325 command = self._get_install_command(kdu_model, kdu_instance, namespace,
326 params_str, version, atomic, timeout)
327
328 self.log.debug("installing: {}".format(command))
329
330 if atomic:
331 # exec helm in a task
332 exec_task = asyncio.ensure_future(
333 coro_or_future=self._local_async_exec(
334 command=command, raise_exception_on_error=False, env=env
335 )
336 )
337
338 # write status in another task
339 status_task = asyncio.ensure_future(
340 coro_or_future=self._store_status(
341 cluster_id=cluster_id,
342 kdu_instance=kdu_instance,
343 namespace=namespace,
344 db_dict=db_dict,
345 operation="install",
346 run_once=False,
347 )
348 )
349
350 # wait for execution task
351 await asyncio.wait([exec_task])
352
353 # cancel status task
354 status_task.cancel()
355
356 output, rc = exec_task.result()
357
358 else:
359
360 output, rc = await self._local_async_exec(
361 command=command, raise_exception_on_error=False, env=env
362 )
363
364 # remove temporal values yaml file
365 if file_to_delete:
366 os.remove(file_to_delete)
367
368 # write final status
369 await self._store_status(
370 cluster_id=cluster_id,
371 kdu_instance=kdu_instance,
372 namespace=namespace,
373 db_dict=db_dict,
374 operation="install",
375 run_once=True,
376 check_every=0,
377 )
378
379 if rc != 0:
380 msg = "Error executing command: {}\nOutput: {}".format(command, output)
381 self.log.error(msg)
382 raise K8sException(msg)
383
384 return kdu_instance
385
386 async def upgrade(
387 self,
388 cluster_uuid: str,
389 kdu_instance: str,
390 kdu_model: str = None,
391 atomic: bool = True,
392 timeout: float = 300,
393 params: dict = None,
394 db_dict: dict = None,
395 ):
396 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
397 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
398
399 # sync local dir
400 self.fs.sync(from_path=cluster_id)
401
402 # look for instance to obtain namespace
403 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
404 if not instance_info:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance))
406
407 # init env, paths
408 paths, env = self._init_paths_env(
409 cluster_name=cluster_id, create_if_not_exist=True
410 )
411
412 # params to str
413 params_str, file_to_delete = self._params_to_file_option(
414 cluster_id=cluster_id, params=params
415 )
416
417 # version
418 version = None
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version = str(parts[1])
423 kdu_model = parts[0]
424
425 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
426 params_str, version, atomic, timeout)
427
428 self.log.debug("upgrading: {}".format(command))
429
430 if atomic:
431
432 # exec helm in a task
433 exec_task = asyncio.ensure_future(
434 coro_or_future=self._local_async_exec(
435 command=command, raise_exception_on_error=False, env=env
436 )
437 )
438 # write status in another task
439 status_task = asyncio.ensure_future(
440 coro_or_future=self._store_status(
441 cluster_id=cluster_id,
442 kdu_instance=kdu_instance,
443 namespace=instance_info["namespace"],
444 db_dict=db_dict,
445 operation="upgrade",
446 run_once=False,
447 )
448 )
449
450 # wait for execution task
451 await asyncio.wait([exec_task])
452
453 # cancel status task
454 status_task.cancel()
455 output, rc = exec_task.result()
456
457 else:
458
459 output, rc = await self._local_async_exec(
460 command=command, raise_exception_on_error=False, env=env
461 )
462
463 # remove temporal values yaml file
464 if file_to_delete:
465 os.remove(file_to_delete)
466
467 # write final status
468 await self._store_status(
469 cluster_id=cluster_id,
470 kdu_instance=kdu_instance,
471 namespace=instance_info["namespace"],
472 db_dict=db_dict,
473 operation="upgrade",
474 run_once=True,
475 check_every=0,
476 )
477
478 if rc != 0:
479 msg = "Error executing command: {}\nOutput: {}".format(command, output)
480 self.log.error(msg)
481 raise K8sException(msg)
482
483 # sync fs
484 self.fs.reverse_sync(from_path=cluster_id)
485
486 # return new revision number
487 instance = await self.get_instance_info(
488 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
489 )
490 if instance:
491 revision = int(instance.get("revision"))
492 self.log.debug("New revision: {}".format(revision))
493 return revision
494 else:
495 return 0
496
497 async def rollback(
498 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
499 ):
500
501 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
502 self.log.debug(
503 "rollback kdu_instance {} to revision {} from cluster {}".format(
504 kdu_instance, revision, cluster_id
505 )
506 )
507
508 # sync local dir
509 self.fs.sync(from_path=cluster_id)
510
511 # look for instance to obtain namespace
512 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
513 if not instance_info:
514 raise K8sException("kdu_instance {} not found".format(kdu_instance))
515
516 # init env, paths
517 paths, env = self._init_paths_env(
518 cluster_name=cluster_id, create_if_not_exist=True
519 )
520
521 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
522 revision)
523
524 self.log.debug("rolling_back: {}".format(command))
525
526 # exec helm in a task
527 exec_task = asyncio.ensure_future(
528 coro_or_future=self._local_async_exec(
529 command=command, raise_exception_on_error=False, env=env
530 )
531 )
532 # write status in another task
533 status_task = asyncio.ensure_future(
534 coro_or_future=self._store_status(
535 cluster_id=cluster_id,
536 kdu_instance=kdu_instance,
537 namespace=instance_info["namespace"],
538 db_dict=db_dict,
539 operation="rollback",
540 run_once=False,
541 )
542 )
543
544 # wait for execution task
545 await asyncio.wait([exec_task])
546
547 # cancel status task
548 status_task.cancel()
549
550 output, rc = exec_task.result()
551
552 # write final status
553 await self._store_status(
554 cluster_id=cluster_id,
555 kdu_instance=kdu_instance,
556 namespace=instance_info["namespace"],
557 db_dict=db_dict,
558 operation="rollback",
559 run_once=True,
560 check_every=0,
561 )
562
563 if rc != 0:
564 msg = "Error executing command: {}\nOutput: {}".format(command, output)
565 self.log.error(msg)
566 raise K8sException(msg)
567
568 # sync fs
569 self.fs.reverse_sync(from_path=cluster_id)
570
571 # return new revision number
572 instance = await self.get_instance_info(
573 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
574 )
575 if instance:
576 revision = int(instance.get("revision"))
577 self.log.debug("New revision: {}".format(revision))
578 return revision
579 else:
580 return 0
581
582 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
583 """
584 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
585 (this call should happen after all _terminate-config-primitive_ of the VNF
586 are invoked).
587
588 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
589 :param kdu_instance: unique name for the KDU instance to be deleted
590 :return: True if successful
591 """
592
593 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
594 self.log.debug(
595 "uninstall kdu_instance {} from cluster {}".format(
596 kdu_instance, cluster_id
597 )
598 )
599
600 # sync local dir
601 self.fs.sync(from_path=cluster_id)
602
603 # look for instance to obtain namespace
604 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
605 if not instance_info:
606 raise K8sException("kdu_instance {} not found".format(kdu_instance))
607
608 # init env, paths
609 paths, env = self._init_paths_env(
610 cluster_name=cluster_id, create_if_not_exist=True
611 )
612
613 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
614 output, _rc = await self._local_async_exec(
615 command=command, raise_exception_on_error=True, env=env
616 )
617
618 # sync fs
619 self.fs.reverse_sync(from_path=cluster_id)
620
621 return self._output_to_table(output)
622
623 async def instances_list(self, cluster_uuid: str) -> list:
624 """
625 returns a list of deployed releases in a cluster
626
627 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
628 :return:
629 """
630
631 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
632 self.log.debug("list releases for cluster {}".format(cluster_id))
633
634 # sync local dir
635 self.fs.sync(from_path=cluster_id)
636
637 # execute internal command
638 result = await self._instances_list(cluster_id)
639
640 # sync fs
641 self.fs.reverse_sync(from_path=cluster_id)
642
643 return result
644
645 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
646 instances = await self.instances_list(cluster_uuid=cluster_uuid)
647 for instance in instances:
648 if instance.get("name") == kdu_instance:
649 return instance
650 self.log.debug("Instance {} not found".format(kdu_instance))
651 return None
652
653 async def exec_primitive(
654 self,
655 cluster_uuid: str = None,
656 kdu_instance: str = None,
657 primitive_name: str = None,
658 timeout: float = 300,
659 params: dict = None,
660 db_dict: dict = None,
661 ) -> str:
662 """Exec primitive (Juju action)
663
664 :param cluster_uuid: The UUID of the cluster or namespace:cluster
665 :param kdu_instance: The unique name of the KDU instance
666 :param primitive_name: Name of action that will be executed
667 :param timeout: Timeout for action execution
668 :param params: Dictionary of all the parameters needed for the action
669 :db_dict: Dictionary for any additional data
670
671 :return: Returns the output of the action
672 """
673 raise K8sException(
674 "KDUs deployed with Helm don't support actions "
675 "different from rollback, upgrade and status"
676 )
677
678 async def get_services(self,
679 cluster_uuid: str,
680 kdu_instance: str,
681 namespace: str) -> list:
682 """
683 Returns a list of services defined for the specified kdu instance.
684
685 :param cluster_uuid: UUID of a K8s cluster known by OSM
686 :param kdu_instance: unique name for the KDU instance
687 :param namespace: K8s namespace used by the KDU instance
688 :return: If successful, it will return a list of services, Each service
689 can have the following data:
690 - `name` of the service
691 - `type` type of service in the k8 cluster
692 - `ports` List of ports offered by the service, for each port includes at least
693 name, port, protocol
694 - `cluster_ip` Internal ip to be used inside k8s cluster
695 - `external_ip` List of external ips (in case they are available)
696 """
697
698 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
699 self.log.debug(
700 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
701 cluster_uuid, kdu_instance
702 )
703 )
704
705 # sync local dir
706 self.fs.sync(from_path=cluster_id)
707
708 # get list of services names for kdu
709 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
710
711 service_list = []
712 for service in service_names:
713 service = await self._get_service(cluster_id, service, namespace)
714 service_list.append(service)
715
716 # sync fs
717 self.fs.reverse_sync(from_path=cluster_id)
718
719 return service_list
720
721 async def get_service(self,
722 cluster_uuid: str,
723 service_name: str,
724 namespace: str) -> object:
725
726 self.log.debug(
727 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
728 service_name, namespace, cluster_uuid)
729 )
730
731 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
732
733 # sync local dir
734 self.fs.sync(from_path=cluster_id)
735
736 service = await self._get_service(cluster_id, service_name, namespace)
737
738 # sync fs
739 self.fs.reverse_sync(from_path=cluster_id)
740
741 return service
742
743 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
744
745 self.log.debug(
746 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
747 cluster_uuid, kdu_instance
748 )
749 )
750
751 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
752
753 # sync local dir
754 self.fs.sync(from_path=cluster_id)
755
756 # get instance: needed to obtain namespace
757 instances = await self._instances_list(cluster_id=cluster_id)
758 for instance in instances:
759 if instance.get("name") == kdu_instance:
760 break
761 else:
762 # instance does not exist
763 raise K8sException("Instance name: {} not found in cluster: {}".format(
764 kdu_instance, cluster_id))
765
766 status = await self._status_kdu(
767 cluster_id=cluster_id,
768 kdu_instance=kdu_instance,
769 namespace=instance["namespace"],
770 show_error_log=True,
771 return_text=True,
772 )
773
774 # sync fs
775 self.fs.reverse_sync(from_path=cluster_id)
776
777 return status
778
779 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
780
781 self.log.debug(
782 "inspect kdu_model values {} from (optional) repo: {}".format(
783 kdu_model, repo_url
784 )
785 )
786
787 return await self._exec_inspect_comand(
788 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
789 )
790
791 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
792
793 self.log.debug(
794 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
795 )
796
797 return await self._exec_inspect_comand(
798 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
799 )
800
801 async def synchronize_repos(self, cluster_uuid: str):
802
803 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
804 try:
805 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
806 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
807
808 local_repo_list = await self.repo_list(cluster_uuid)
809 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
810
811 deleted_repo_list = []
812 added_repo_dict = {}
813
814 # iterate over the list of repos in the database that should be
815 # added if not present
816 for repo_name, db_repo in db_repo_dict.items():
817 try:
818 # check if it is already present
819 curr_repo_url = local_repo_dict.get(db_repo["name"])
820 repo_id = db_repo.get("_id")
821 if curr_repo_url != db_repo["url"]:
822 if curr_repo_url:
823 self.log.debug("repo {} url changed, delete and and again".format(
824 db_repo["url"]))
825 await self.repo_remove(cluster_uuid, db_repo["name"])
826 deleted_repo_list.append(repo_id)
827
828 # add repo
829 self.log.debug("add repo {}".format(db_repo["name"]))
830 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
831 added_repo_dict[repo_id] = db_repo["name"]
832 except Exception as e:
833 raise K8sException(
834 "Error adding repo id: {}, err_msg: {} ".format(
835 repo_id, repr(e)
836 )
837 )
838
839 # Delete repos that are present but not in nbi_list
840 for repo_name in local_repo_dict:
841 if not db_repo_dict.get(repo_name) and repo_name != "stable":
842 self.log.debug("delete repo {}".format(repo_name))
843 try:
844 await self.repo_remove(cluster_uuid, repo_name)
845 deleted_repo_list.append(repo_name)
846 except Exception as e:
847 self.warning(
848 "Error deleting repo, name: {}, err_msg: {}".format(
849 repo_name, str(e)
850 )
851 )
852
853 return deleted_repo_list, added_repo_dict
854
855 except K8sException:
856 raise
857 except Exception as e:
858 # Do not raise errors synchronizing repos
859 self.log.error("Error synchronizing repos: {}".format(e))
860 raise Exception("Error synchronizing repos: {}".format(e))
861
862 def _get_db_repos_dict(self, repo_ids: list):
863 db_repos_dict = {}
864 for repo_id in repo_ids:
865 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
866 db_repos_dict[db_repo["name"]] = db_repo
867 return db_repos_dict
868
869 """
870 ####################################################################################
871 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
872 ####################################################################################
873 """
874
875 @abc.abstractmethod
876 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
877 """
878 Creates and returns base cluster and kube dirs and returns them.
879 Also created helm3 dirs according to new directory specification, paths are
880 not returned but assigned to helm environment variables
881
882 :param cluster_name: cluster_name
883 :return: Dictionary with config_paths and dictionary with helm environment variables
884 """
885
886 @abc.abstractmethod
887 async def _cluster_init(self, cluster_id, namespace, paths, env):
888 """
889 Implements the helm version dependent cluster initialization
890 """
891
892 @abc.abstractmethod
893 async def _instances_list(self, cluster_id):
894 """
895 Implements the helm version dependent helm instances list
896 """
897
898 @abc.abstractmethod
899 async def _get_services(self, cluster_id, kdu_instance, namespace):
900 """
901 Implements the helm version dependent method to obtain services from a helm instance
902 """
903
904 @abc.abstractmethod
905 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
906 show_error_log: bool = False, return_text: bool = False):
907 """
908 Implements the helm version dependent method to obtain status of a helm instance
909 """
910
911 @abc.abstractmethod
912 def _get_install_command(self, kdu_model, kdu_instance, namespace,
913 params_str, version, atomic, timeout) -> str:
914 """
915 Obtain command to be executed to delete the indicated instance
916 """
917
918 @abc.abstractmethod
919 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
920 params_str, version, atomic, timeout) -> str:
921 """
922 Obtain command to be executed to upgrade the indicated instance
923 """
924
925 @abc.abstractmethod
926 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
927 """
928 Obtain command to be executed to rollback the indicated instance
929 """
930
931 @abc.abstractmethod
932 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
933 """
934 Obtain command to be executed to delete the indicated instance
935 """
936
937 @abc.abstractmethod
938 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
939 version: str):
940 """
941 Obtain command to be executed to obtain information about the kdu
942 """
943
944 @abc.abstractmethod
945 async def _uninstall_sw(self, cluster_id: str, namespace: str):
946 """
947 Method call to uninstall cluster software for helm. This method is dependent
948 of helm version
949 For Helm v2 it will be called when Tiller must be uninstalled
950 For Helm v3 it does nothing and does not need to be callled
951 """
952
953 @abc.abstractmethod
954 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
955 """
956 Obtains the cluster repos identifiers
957 """
958
959 """
960 ####################################################################################
961 ################################### P R I V A T E ##################################
962 ####################################################################################
963 """
964
965 @staticmethod
966 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
967 if os.path.exists(filename):
968 return True
969 else:
970 msg = "File {} does not exist".format(filename)
971 if exception_if_not_exists:
972 raise K8sException(msg)
973
974 @staticmethod
975 def _remove_multiple_spaces(strobj):
976 strobj = strobj.strip()
977 while " " in strobj:
978 strobj = strobj.replace(" ", " ")
979 return strobj
980
981 @staticmethod
982 def _output_to_lines(output: str) -> list:
983 output_lines = list()
984 lines = output.splitlines(keepends=False)
985 for line in lines:
986 line = line.strip()
987 if len(line) > 0:
988 output_lines.append(line)
989 return output_lines
990
991 @staticmethod
992 def _output_to_table(output: str) -> list:
993 output_table = list()
994 lines = output.splitlines(keepends=False)
995 for line in lines:
996 line = line.replace("\t", " ")
997 line_list = list()
998 output_table.append(line_list)
999 cells = line.split(sep=" ")
1000 for cell in cells:
1001 cell = cell.strip()
1002 if len(cell) > 0:
1003 line_list.append(cell)
1004 return output_table
1005
1006 @staticmethod
1007 def _parse_services(output: str) -> list:
1008 lines = output.splitlines(keepends=False)
1009 services = []
1010 for line in lines:
1011 line = line.replace("\t", " ")
1012 cells = line.split(sep=" ")
1013 if len(cells) > 0 and cells[0].startswith("service/"):
1014 elems = cells[0].split(sep="/")
1015 if len(elems) > 1:
1016 services.append(elems[1])
1017 return services
1018
1019 @staticmethod
1020 def _get_deep(dictionary: dict, members: tuple):
1021 target = dictionary
1022 value = None
1023 try:
1024 for m in members:
1025 value = target.get(m)
1026 if not value:
1027 return None
1028 else:
1029 target = value
1030 except Exception:
1031 pass
1032 return value
1033
1034 # find key:value in several lines
1035 @staticmethod
1036 def _find_in_lines(p_lines: list, p_key: str) -> str:
1037 for line in p_lines:
1038 try:
1039 if line.startswith(p_key + ":"):
1040 parts = line.split(":")
1041 the_value = parts[1].strip()
1042 return the_value
1043 except Exception:
1044 # ignore it
1045 pass
1046 return None
1047
1048 @staticmethod
1049 def _lower_keys_list(input_list: list):
1050 """
1051 Transform the keys in a list of dictionaries to lower case and returns a new list
1052 of dictionaries
1053 """
1054 new_list = []
1055 for dictionary in input_list:
1056 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1057 new_list.append(new_dict)
1058 return new_list
1059
1060 def _local_exec(self, command: str) -> (str, int):
1061 command = self._remove_multiple_spaces(command)
1062 self.log.debug("Executing sync local command: {}".format(command))
1063 # raise exception if fails
1064 output = ""
1065 try:
1066 output = subprocess.check_output(
1067 command, shell=True, universal_newlines=True
1068 )
1069 return_code = 0
1070 self.log.debug(output)
1071 except Exception:
1072 return_code = 1
1073
1074 return output, return_code
1075
1076 async def _local_async_exec(
1077 self,
1078 command: str,
1079 raise_exception_on_error: bool = False,
1080 show_error_log: bool = True,
1081 encode_utf8: bool = False,
1082 env: dict = None
1083 ) -> (str, int):
1084
1085 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1086 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1087
1088 # split command
1089 command = shlex.split(command)
1090
1091 environ = os.environ.copy()
1092 if env:
1093 environ.update(env)
1094
1095 try:
1096 process = await asyncio.create_subprocess_exec(
1097 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1098 env=environ
1099 )
1100
1101 # wait for command terminate
1102 stdout, stderr = await process.communicate()
1103
1104 return_code = process.returncode
1105
1106 output = ""
1107 if stdout:
1108 output = stdout.decode("utf-8").strip()
1109 # output = stdout.decode()
1110 if stderr:
1111 output = stderr.decode("utf-8").strip()
1112 # output = stderr.decode()
1113
1114 if return_code != 0 and show_error_log:
1115 self.log.debug(
1116 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1117 )
1118 else:
1119 self.log.debug("Return code: {}".format(return_code))
1120
1121 if raise_exception_on_error and return_code != 0:
1122 raise K8sException(output)
1123
1124 if encode_utf8:
1125 output = output.encode("utf-8").strip()
1126 output = str(output).replace("\\n", "\n")
1127
1128 return output, return_code
1129
1130 except asyncio.CancelledError:
1131 raise
1132 except K8sException:
1133 raise
1134 except Exception as e:
1135 msg = "Exception executing command: {} -> {}".format(command, e)
1136 self.log.error(msg)
1137 if raise_exception_on_error:
1138 raise K8sException(e) from e
1139 else:
1140 return "", -1
1141
1142 async def _local_async_exec_pipe(self,
1143 command1: str,
1144 command2: str,
1145 raise_exception_on_error: bool = True,
1146 show_error_log: bool = True,
1147 encode_utf8: bool = False,
1148 env: dict = None):
1149
1150 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1151 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1152 command = "{} | {}".format(command1, command2)
1153 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1154
1155 # split command
1156 command1 = shlex.split(command1)
1157 command2 = shlex.split(command2)
1158
1159 environ = os.environ.copy()
1160 if env:
1161 environ.update(env)
1162
1163 try:
1164 read, write = os.pipe()
1165 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1166 os.close(write)
1167 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1168 stdout=asyncio.subprocess.PIPE,
1169 env=environ)
1170 os.close(read)
1171 stdout, stderr = await process_2.communicate()
1172
1173 return_code = process_2.returncode
1174
1175 output = ""
1176 if stdout:
1177 output = stdout.decode("utf-8").strip()
1178 # output = stdout.decode()
1179 if stderr:
1180 output = stderr.decode("utf-8").strip()
1181 # output = stderr.decode()
1182
1183 if return_code != 0 and show_error_log:
1184 self.log.debug(
1185 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1186 )
1187 else:
1188 self.log.debug("Return code: {}".format(return_code))
1189
1190 if raise_exception_on_error and return_code != 0:
1191 raise K8sException(output)
1192
1193 if encode_utf8:
1194 output = output.encode("utf-8").strip()
1195 output = str(output).replace("\\n", "\n")
1196
1197 return output, return_code
1198 except asyncio.CancelledError:
1199 raise
1200 except K8sException:
1201 raise
1202 except Exception as e:
1203 msg = "Exception executing command: {} -> {}".format(command, e)
1204 self.log.error(msg)
1205 if raise_exception_on_error:
1206 raise K8sException(e) from e
1207 else:
1208 return "", -1
1209
1210 async def _get_service(self, cluster_id, service_name, namespace):
1211 """
1212 Obtains the data of the specified service in the k8cluster.
1213
1214 :param cluster_id: id of a K8s cluster known by OSM
1215 :param service_name: name of the K8s service in the specified namespace
1216 :param namespace: K8s namespace used by the KDU instance
1217 :return: If successful, it will return a service with the following data:
1218 - `name` of the service
1219 - `type` type of service in the k8 cluster
1220 - `ports` List of ports offered by the service, for each port includes at least
1221 name, port, protocol
1222 - `cluster_ip` Internal ip to be used inside k8s cluster
1223 - `external_ip` List of external ips (in case they are available)
1224 """
1225
1226 # init config, env
1227 paths, env = self._init_paths_env(
1228 cluster_name=cluster_id, create_if_not_exist=True
1229 )
1230
1231 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1232 self.kubectl_command, paths["kube_config"], namespace, service_name
1233 )
1234
1235 output, _rc = await self._local_async_exec(
1236 command=command, raise_exception_on_error=True, env=env
1237 )
1238
1239 data = yaml.load(output, Loader=yaml.SafeLoader)
1240
1241 service = {
1242 "name": service_name,
1243 "type": self._get_deep(data, ("spec", "type")),
1244 "ports": self._get_deep(data, ("spec", "ports")),
1245 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1246 }
1247 if service["type"] == "LoadBalancer":
1248 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1249 ip_list = [elem["ip"] for elem in ip_map_list]
1250 service["external_ip"] = ip_list
1251
1252 return service
1253
1254 async def _exec_inspect_comand(
1255 self, inspect_command: str, kdu_model: str, repo_url: str = None
1256 ):
1257 """
1258 Obtains information about a kdu, no cluster (no env)
1259 """
1260
1261 repo_str = ""
1262 if repo_url:
1263 repo_str = " --repo {}".format(repo_url)
1264
1265 idx = kdu_model.find("/")
1266 if idx >= 0:
1267 idx += 1
1268 kdu_model = kdu_model[idx:]
1269
1270 version = ""
1271 if ":" in kdu_model:
1272 parts = kdu_model.split(sep=":")
1273 if len(parts) == 2:
1274 version = "--version {}".format(str(parts[1]))
1275 kdu_model = parts[0]
1276
1277 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1278 output, _rc = await self._local_async_exec(
1279 command=full_command, encode_utf8=True
1280 )
1281
1282 return output
1283
1284 async def _store_status(
1285 self,
1286 cluster_id: str,
1287 operation: str,
1288 kdu_instance: str,
1289 namespace: str = None,
1290 check_every: float = 10,
1291 db_dict: dict = None,
1292 run_once: bool = False,
1293 ):
1294 while True:
1295 try:
1296 await asyncio.sleep(check_every)
1297 detailed_status = await self._status_kdu(
1298 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1299 return_text=False
1300 )
1301 status = detailed_status.get("info").get("description")
1302 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1303 # write status to db
1304 result = await self.write_app_status_to_db(
1305 db_dict=db_dict,
1306 status=str(status),
1307 detailed_status=str(detailed_status),
1308 operation=operation,
1309 )
1310 if not result:
1311 self.log.info("Error writing in database. Task exiting...")
1312 return
1313 except asyncio.CancelledError:
1314 self.log.debug("Task cancelled")
1315 return
1316 except Exception as e:
1317 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1318 pass
1319 finally:
1320 if run_once:
1321 return
1322
1323 # params for use in -f file
1324 # returns values file option and filename (in order to delete it at the end)
1325 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1326
1327 if params and len(params) > 0:
1328 self._init_paths_env(
1329 cluster_name=cluster_id, create_if_not_exist=True
1330 )
1331
1332 def get_random_number():
1333 r = random.randrange(start=1, stop=99999999)
1334 s = str(r)
1335 while len(s) < 10:
1336 s = "0" + s
1337 return s
1338
1339 params2 = dict()
1340 for key in params:
1341 value = params.get(key)
1342 if "!!yaml" in str(value):
1343 value = yaml.load(value[7:])
1344 params2[key] = value
1345
1346 values_file = get_random_number() + ".yaml"
1347 with open(values_file, "w") as stream:
1348 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1349
1350 return "-f {}".format(values_file), values_file
1351
1352 return "", None
1353
1354 # params for use in --set option
1355 @staticmethod
1356 def _params_to_set_option(params: dict) -> str:
1357 params_str = ""
1358 if params and len(params) > 0:
1359 start = True
1360 for key in params:
1361 value = params.get(key, None)
1362 if value is not None:
1363 if start:
1364 params_str += "--set "
1365 start = False
1366 else:
1367 params_str += ","
1368 params_str += "{}={}".format(key, value)
1369 return params_str
1370
1371 @staticmethod
1372 def _generate_release_name(chart_name: str):
1373 # check embeded chart (file or dir)
1374 if chart_name.startswith("/"):
1375 # extract file or directory name
1376 chart_name = chart_name[chart_name.rfind("/") + 1:]
1377 # check URL
1378 elif "://" in chart_name:
1379 # extract last portion of URL
1380 chart_name = chart_name[chart_name.rfind("/") + 1:]
1381
1382 name = ""
1383 for c in chart_name:
1384 if c.isalpha() or c.isnumeric():
1385 name += c
1386 else:
1387 name += "-"
1388 if len(name) > 35:
1389 name = name[0:35]
1390
1391 # if does not start with alpha character, prefix 'a'
1392 if not name[0].isalpha():
1393 name = "a" + name
1394
1395 name += "-"
1396
1397 def get_random_number():
1398 r = random.randrange(start=1, stop=99999999)
1399 s = str(r)
1400 s = s.rjust(10, "0")
1401 return s
1402
1403 name = name + get_random_number()
1404 return name.lower()