Fix bug 1298
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 helm_command: str = "/usr/bin/helm",
53 log: object = None,
54 on_update_db=None,
55 ):
56 """
57
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param on_update_db: callback called when k8s connector updates database
64 """
65
66 # parent class
67 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
68
69 self.log.info("Initializing K8S Helm connector")
70
71 # random numbers for release name generation
72 random.seed(time.time())
73
74 # the file system
75 self.fs = fs
76
77 # exception if kubectl is not installed
78 self.kubectl_command = kubectl_command
79 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
80
81 # exception if helm is not installed
82 self._helm_command = helm_command
83 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
84
85 @staticmethod
86 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
87 """
88 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
89 cluster_id for backward compatibility
90 """
91 namespace, _, cluster_id = cluster_uuid.rpartition(':')
92 return namespace, cluster_id
93
94 async def init_env(
95 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
96 ) -> (str, bool):
97 """
98 It prepares a given K8s cluster environment to run Charts
99
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
101 '.kube/config'
102 :param namespace: optional namespace to be used for helm. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :return: uuid of the K8s cluster and True if connector has installed some
106 software in the cluster
107 (on error, an exception will be raised)
108 """
109
110 if reuse_cluster_uuid:
111 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
112 namespace = namespace_ or namespace
113 else:
114 cluster_id = str(uuid4())
115 cluster_uuid = "{}:{}".format(namespace, cluster_id)
116
117 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
118
119 paths, env = self._init_paths_env(
120 cluster_name=cluster_id, create_if_not_exist=True
121 )
122 mode = stat.S_IRUSR | stat.S_IWUSR
123 with open(paths["kube_config"], "w", mode) as f:
124 f.write(k8s_creds)
125 os.chmod(paths["kube_config"], 0o600)
126
127 # Code with initialization specific of helm version
128 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
129
130 # sync fs with local data
131 self.fs.reverse_sync(from_path=cluster_id)
132
133 self.log.info("Cluster {} initialized".format(cluster_id))
134
135 return cluster_uuid, n2vc_installed_sw
136
137 async def repo_add(
138 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
139 ):
140 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
141 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
142 cluster_id, repo_type, name, url))
143
144 # sync local dir
145 self.fs.sync(from_path=cluster_id)
146
147 # init_env
148 paths, env = self._init_paths_env(
149 cluster_name=cluster_id, create_if_not_exist=True
150 )
151
152 # helm repo update
153 command = "{} repo update".format(
154 self._helm_command
155 )
156 self.log.debug("updating repo: {}".format(command))
157 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
158
159 # helm repo add name url
160 command = "{} repo add {} {}".format(
161 self._helm_command, name, url
162 )
163 self.log.debug("adding repo: {}".format(command))
164 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
165
166 # sync fs
167 self.fs.reverse_sync(from_path=cluster_id)
168
169 async def repo_list(self, cluster_uuid: str) -> list:
170 """
171 Get the list of registered repositories
172
173 :return: list of registered repositories: [ (name, url) .... ]
174 """
175
176 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
177 self.log.debug("list repositories for cluster {}".format(cluster_id))
178
179 # sync local dir
180 self.fs.sync(from_path=cluster_id)
181
182 # config filename
183 paths, env = self._init_paths_env(
184 cluster_name=cluster_id, create_if_not_exist=True
185 )
186
187 command = "{} repo list --output yaml".format(
188 self._helm_command
189 )
190
191 # Set exception to false because if there are no repos just want an empty list
192 output, _rc = await self._local_async_exec(
193 command=command, raise_exception_on_error=False, env=env
194 )
195
196 # sync fs
197 self.fs.reverse_sync(from_path=cluster_id)
198
199 if _rc == 0:
200 if output and len(output) > 0:
201 repos = yaml.load(output, Loader=yaml.SafeLoader)
202 # unify format between helm2 and helm3 setting all keys lowercase
203 return self._lower_keys_list(repos)
204 else:
205 return []
206 else:
207 return []
208
209 async def repo_remove(self, cluster_uuid: str, name: str):
210
211 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
212 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
213
214 # sync local dir
215 self.fs.sync(from_path=cluster_id)
216
217 # init env, paths
218 paths, env = self._init_paths_env(
219 cluster_name=cluster_id, create_if_not_exist=True
220 )
221
222 command = "{} repo remove {}".format(
223 self._helm_command, name
224 )
225 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
226
227 # sync fs
228 self.fs.reverse_sync(from_path=cluster_id)
229
230 async def reset(
231 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
232 ) -> bool:
233
234 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
235 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
236 .format(cluster_id, uninstall_sw))
237
238 # sync local dir
239 self.fs.sync(from_path=cluster_id)
240
241 # uninstall releases if needed.
242 if uninstall_sw:
243 releases = await self.instances_list(cluster_uuid=cluster_uuid)
244 if len(releases) > 0:
245 if force:
246 for r in releases:
247 try:
248 kdu_instance = r.get("name")
249 chart = r.get("chart")
250 self.log.debug(
251 "Uninstalling {} -> {}".format(chart, kdu_instance)
252 )
253 await self.uninstall(
254 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
255 )
256 except Exception as e:
257 # will not raise exception as it was found
258 # that in some cases of previously installed helm releases it
259 # raised an error
260 self.log.warn(
261 "Error uninstalling release {}: {}".format(kdu_instance, e)
262 )
263 else:
264 msg = (
265 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
266 ).format(cluster_id)
267 self.log.warn(msg)
268 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
269
270 if uninstall_sw:
271 await self._uninstall_sw(cluster_id, namespace)
272
273 # delete cluster directory
274 self.log.debug("Removing directory {}".format(cluster_id))
275 self.fs.file_delete(cluster_id, ignore_non_exist=True)
276 # Remove also local directorio if still exist
277 direct = self.fs.path + "/" + cluster_id
278 shutil.rmtree(direct, ignore_errors=True)
279
280 return True
281
282 async def install(
283 self,
284 cluster_uuid: str,
285 kdu_model: str,
286 atomic: bool = True,
287 timeout: float = 300,
288 params: dict = None,
289 db_dict: dict = None,
290 kdu_name: str = None,
291 namespace: str = None,
292 ):
293 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
294 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
295
296 # sync local dir
297 self.fs.sync(from_path=cluster_id)
298
299 # init env, paths
300 paths, env = self._init_paths_env(
301 cluster_name=cluster_id, create_if_not_exist=True
302 )
303
304 # params to str
305 params_str, file_to_delete = self._params_to_file_option(
306 cluster_id=cluster_id, params=params
307 )
308
309 # version
310 version = None
311 if ":" in kdu_model:
312 parts = kdu_model.split(sep=":")
313 if len(parts) == 2:
314 version = str(parts[1])
315 kdu_model = parts[0]
316
317 # generate a name for the release. Then, check if already exists
318 kdu_instance = None
319 while kdu_instance is None:
320 kdu_instance = self._generate_release_name(kdu_model)
321 try:
322 result = await self._status_kdu(
323 cluster_id=cluster_id,
324 kdu_instance=kdu_instance,
325 namespace=namespace,
326 show_error_log=False,
327 )
328 if result is not None:
329 # instance already exists: generate a new one
330 kdu_instance = None
331 except K8sException:
332 pass
333
334 command = self._get_install_command(kdu_model, kdu_instance, namespace,
335 params_str, version, atomic, timeout)
336
337 self.log.debug("installing: {}".format(command))
338
339 if atomic:
340 # exec helm in a task
341 exec_task = asyncio.ensure_future(
342 coro_or_future=self._local_async_exec(
343 command=command, raise_exception_on_error=False, env=env
344 )
345 )
346
347 # write status in another task
348 status_task = asyncio.ensure_future(
349 coro_or_future=self._store_status(
350 cluster_id=cluster_id,
351 kdu_instance=kdu_instance,
352 namespace=namespace,
353 db_dict=db_dict,
354 operation="install",
355 run_once=False,
356 )
357 )
358
359 # wait for execution task
360 await asyncio.wait([exec_task])
361
362 # cancel status task
363 status_task.cancel()
364
365 output, rc = exec_task.result()
366
367 else:
368
369 output, rc = await self._local_async_exec(
370 command=command, raise_exception_on_error=False, env=env
371 )
372
373 # remove temporal values yaml file
374 if file_to_delete:
375 os.remove(file_to_delete)
376
377 # write final status
378 await self._store_status(
379 cluster_id=cluster_id,
380 kdu_instance=kdu_instance,
381 namespace=namespace,
382 db_dict=db_dict,
383 operation="install",
384 run_once=True,
385 check_every=0,
386 )
387
388 if rc != 0:
389 msg = "Error executing command: {}\nOutput: {}".format(command, output)
390 self.log.error(msg)
391 raise K8sException(msg)
392
393 # sync fs
394 self.fs.reverse_sync(from_path=cluster_id)
395
396 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
397 return kdu_instance
398
399 async def upgrade(
400 self,
401 cluster_uuid: str,
402 kdu_instance: str,
403 kdu_model: str = None,
404 atomic: bool = True,
405 timeout: float = 300,
406 params: dict = None,
407 db_dict: dict = None,
408 ):
409 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
410 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
411
412 # sync local dir
413 self.fs.sync(from_path=cluster_id)
414
415 # look for instance to obtain namespace
416 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
417 if not instance_info:
418 raise K8sException("kdu_instance {} not found".format(kdu_instance))
419
420 # init env, paths
421 paths, env = self._init_paths_env(
422 cluster_name=cluster_id, create_if_not_exist=True
423 )
424
425 # params to str
426 params_str, file_to_delete = self._params_to_file_option(
427 cluster_id=cluster_id, params=params
428 )
429
430 # version
431 version = None
432 if ":" in kdu_model:
433 parts = kdu_model.split(sep=":")
434 if len(parts) == 2:
435 version = str(parts[1])
436 kdu_model = parts[0]
437
438 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
439 params_str, version, atomic, timeout)
440
441 self.log.debug("upgrading: {}".format(command))
442
443 if atomic:
444
445 # exec helm in a task
446 exec_task = asyncio.ensure_future(
447 coro_or_future=self._local_async_exec(
448 command=command, raise_exception_on_error=False, env=env
449 )
450 )
451 # write status in another task
452 status_task = asyncio.ensure_future(
453 coro_or_future=self._store_status(
454 cluster_id=cluster_id,
455 kdu_instance=kdu_instance,
456 namespace=instance_info["namespace"],
457 db_dict=db_dict,
458 operation="upgrade",
459 run_once=False,
460 )
461 )
462
463 # wait for execution task
464 await asyncio.wait([exec_task])
465
466 # cancel status task
467 status_task.cancel()
468 output, rc = exec_task.result()
469
470 else:
471
472 output, rc = await self._local_async_exec(
473 command=command, raise_exception_on_error=False, env=env
474 )
475
476 # remove temporal values yaml file
477 if file_to_delete:
478 os.remove(file_to_delete)
479
480 # write final status
481 await self._store_status(
482 cluster_id=cluster_id,
483 kdu_instance=kdu_instance,
484 namespace=instance_info["namespace"],
485 db_dict=db_dict,
486 operation="upgrade",
487 run_once=True,
488 check_every=0,
489 )
490
491 if rc != 0:
492 msg = "Error executing command: {}\nOutput: {}".format(command, output)
493 self.log.error(msg)
494 raise K8sException(msg)
495
496 # sync fs
497 self.fs.reverse_sync(from_path=cluster_id)
498
499 # return new revision number
500 instance = await self.get_instance_info(
501 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
502 )
503 if instance:
504 revision = int(instance.get("revision"))
505 self.log.debug("New revision: {}".format(revision))
506 return revision
507 else:
508 return 0
509
510 async def rollback(
511 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
512 ):
513
514 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
515 self.log.debug(
516 "rollback kdu_instance {} to revision {} from cluster {}".format(
517 kdu_instance, revision, cluster_id
518 )
519 )
520
521 # sync local dir
522 self.fs.sync(from_path=cluster_id)
523
524 # look for instance to obtain namespace
525 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
526 if not instance_info:
527 raise K8sException("kdu_instance {} not found".format(kdu_instance))
528
529 # init env, paths
530 paths, env = self._init_paths_env(
531 cluster_name=cluster_id, create_if_not_exist=True
532 )
533
534 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
535 revision)
536
537 self.log.debug("rolling_back: {}".format(command))
538
539 # exec helm in a task
540 exec_task = asyncio.ensure_future(
541 coro_or_future=self._local_async_exec(
542 command=command, raise_exception_on_error=False, env=env
543 )
544 )
545 # write status in another task
546 status_task = asyncio.ensure_future(
547 coro_or_future=self._store_status(
548 cluster_id=cluster_id,
549 kdu_instance=kdu_instance,
550 namespace=instance_info["namespace"],
551 db_dict=db_dict,
552 operation="rollback",
553 run_once=False,
554 )
555 )
556
557 # wait for execution task
558 await asyncio.wait([exec_task])
559
560 # cancel status task
561 status_task.cancel()
562
563 output, rc = exec_task.result()
564
565 # write final status
566 await self._store_status(
567 cluster_id=cluster_id,
568 kdu_instance=kdu_instance,
569 namespace=instance_info["namespace"],
570 db_dict=db_dict,
571 operation="rollback",
572 run_once=True,
573 check_every=0,
574 )
575
576 if rc != 0:
577 msg = "Error executing command: {}\nOutput: {}".format(command, output)
578 self.log.error(msg)
579 raise K8sException(msg)
580
581 # sync fs
582 self.fs.reverse_sync(from_path=cluster_id)
583
584 # return new revision number
585 instance = await self.get_instance_info(
586 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
587 )
588 if instance:
589 revision = int(instance.get("revision"))
590 self.log.debug("New revision: {}".format(revision))
591 return revision
592 else:
593 return 0
594
595 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
596 """
597 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
598 (this call should happen after all _terminate-config-primitive_ of the VNF
599 are invoked).
600
601 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
602 :param kdu_instance: unique name for the KDU instance to be deleted
603 :return: True if successful
604 """
605
606 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
607 self.log.debug(
608 "uninstall kdu_instance {} from cluster {}".format(
609 kdu_instance, cluster_id
610 )
611 )
612
613 # sync local dir
614 self.fs.sync(from_path=cluster_id)
615
616 # look for instance to obtain namespace
617 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
618 if not instance_info:
619 raise K8sException("kdu_instance {} not found".format(kdu_instance))
620
621 # init env, paths
622 paths, env = self._init_paths_env(
623 cluster_name=cluster_id, create_if_not_exist=True
624 )
625
626 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
627 output, _rc = await self._local_async_exec(
628 command=command, raise_exception_on_error=True, env=env
629 )
630
631 # sync fs
632 self.fs.reverse_sync(from_path=cluster_id)
633
634 return self._output_to_table(output)
635
636 async def instances_list(self, cluster_uuid: str) -> list:
637 """
638 returns a list of deployed releases in a cluster
639
640 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
641 :return:
642 """
643
644 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
645 self.log.debug("list releases for cluster {}".format(cluster_id))
646
647 # sync local dir
648 self.fs.sync(from_path=cluster_id)
649
650 # execute internal command
651 result = await self._instances_list(cluster_id)
652
653 # sync fs
654 self.fs.reverse_sync(from_path=cluster_id)
655
656 return result
657
658 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
659 instances = await self.instances_list(cluster_uuid=cluster_uuid)
660 for instance in instances:
661 if instance.get("name") == kdu_instance:
662 return instance
663 self.log.debug("Instance {} not found".format(kdu_instance))
664 return None
665
666 async def exec_primitive(
667 self,
668 cluster_uuid: str = None,
669 kdu_instance: str = None,
670 primitive_name: str = None,
671 timeout: float = 300,
672 params: dict = None,
673 db_dict: dict = None,
674 ) -> str:
675 """Exec primitive (Juju action)
676
677 :param cluster_uuid: The UUID of the cluster or namespace:cluster
678 :param kdu_instance: The unique name of the KDU instance
679 :param primitive_name: Name of action that will be executed
680 :param timeout: Timeout for action execution
681 :param params: Dictionary of all the parameters needed for the action
682 :db_dict: Dictionary for any additional data
683
684 :return: Returns the output of the action
685 """
686 raise K8sException(
687 "KDUs deployed with Helm don't support actions "
688 "different from rollback, upgrade and status"
689 )
690
691 async def get_services(self,
692 cluster_uuid: str,
693 kdu_instance: str,
694 namespace: str) -> list:
695 """
696 Returns a list of services defined for the specified kdu instance.
697
698 :param cluster_uuid: UUID of a K8s cluster known by OSM
699 :param kdu_instance: unique name for the KDU instance
700 :param namespace: K8s namespace used by the KDU instance
701 :return: If successful, it will return a list of services, Each service
702 can have the following data:
703 - `name` of the service
704 - `type` type of service in the k8 cluster
705 - `ports` List of ports offered by the service, for each port includes at least
706 name, port, protocol
707 - `cluster_ip` Internal ip to be used inside k8s cluster
708 - `external_ip` List of external ips (in case they are available)
709 """
710
711 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
712 self.log.debug(
713 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
714 cluster_uuid, kdu_instance
715 )
716 )
717
718 # sync local dir
719 self.fs.sync(from_path=cluster_id)
720
721 # get list of services names for kdu
722 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
723
724 service_list = []
725 for service in service_names:
726 service = await self._get_service(cluster_id, service, namespace)
727 service_list.append(service)
728
729 # sync fs
730 self.fs.reverse_sync(from_path=cluster_id)
731
732 return service_list
733
734 async def get_service(self,
735 cluster_uuid: str,
736 service_name: str,
737 namespace: str) -> object:
738
739 self.log.debug(
740 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
741 service_name, namespace, cluster_uuid)
742 )
743
744 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
745
746 # sync local dir
747 self.fs.sync(from_path=cluster_id)
748
749 service = await self._get_service(cluster_id, service_name, namespace)
750
751 # sync fs
752 self.fs.reverse_sync(from_path=cluster_id)
753
754 return service
755
756 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
757
758 self.log.debug(
759 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
760 cluster_uuid, kdu_instance
761 )
762 )
763
764 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
765
766 # sync local dir
767 self.fs.sync(from_path=cluster_id)
768
769 # get instance: needed to obtain namespace
770 instances = await self._instances_list(cluster_id=cluster_id)
771 for instance in instances:
772 if instance.get("name") == kdu_instance:
773 break
774 else:
775 # instance does not exist
776 raise K8sException("Instance name: {} not found in cluster: {}".format(
777 kdu_instance, cluster_id))
778
779 status = await self._status_kdu(
780 cluster_id=cluster_id,
781 kdu_instance=kdu_instance,
782 namespace=instance["namespace"],
783 show_error_log=True,
784 return_text=True,
785 )
786
787 # sync fs
788 self.fs.reverse_sync(from_path=cluster_id)
789
790 return status
791
792 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
793
794 self.log.debug(
795 "inspect kdu_model values {} from (optional) repo: {}".format(
796 kdu_model, repo_url
797 )
798 )
799
800 return await self._exec_inspect_comand(
801 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
802 )
803
804 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
805
806 self.log.debug(
807 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
808 )
809
810 return await self._exec_inspect_comand(
811 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
812 )
813
814 async def synchronize_repos(self, cluster_uuid: str):
815
816 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
817 try:
818 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
819 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
820
821 local_repo_list = await self.repo_list(cluster_uuid)
822 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
823
824 deleted_repo_list = []
825 added_repo_dict = {}
826
827 # iterate over the list of repos in the database that should be
828 # added if not present
829 for repo_name, db_repo in db_repo_dict.items():
830 try:
831 # check if it is already present
832 curr_repo_url = local_repo_dict.get(db_repo["name"])
833 repo_id = db_repo.get("_id")
834 if curr_repo_url != db_repo["url"]:
835 if curr_repo_url:
836 self.log.debug("repo {} url changed, delete and and again".format(
837 db_repo["url"]))
838 await self.repo_remove(cluster_uuid, db_repo["name"])
839 deleted_repo_list.append(repo_id)
840
841 # add repo
842 self.log.debug("add repo {}".format(db_repo["name"]))
843 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
844 added_repo_dict[repo_id] = db_repo["name"]
845 except Exception as e:
846 raise K8sException(
847 "Error adding repo id: {}, err_msg: {} ".format(
848 repo_id, repr(e)
849 )
850 )
851
852 # Delete repos that are present but not in nbi_list
853 for repo_name in local_repo_dict:
854 if not db_repo_dict.get(repo_name) and repo_name != "stable":
855 self.log.debug("delete repo {}".format(repo_name))
856 try:
857 await self.repo_remove(cluster_uuid, repo_name)
858 deleted_repo_list.append(repo_name)
859 except Exception as e:
860 self.warning(
861 "Error deleting repo, name: {}, err_msg: {}".format(
862 repo_name, str(e)
863 )
864 )
865
866 return deleted_repo_list, added_repo_dict
867
868 except K8sException:
869 raise
870 except Exception as e:
871 # Do not raise errors synchronizing repos
872 self.log.error("Error synchronizing repos: {}".format(e))
873 raise Exception("Error synchronizing repos: {}".format(e))
874
875 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
876 repo_ids = []
877 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
878 cluster = self.db.get_one("k8sclusters", cluster_filter)
879 if cluster:
880 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
881 return repo_ids
882 else:
883 raise K8sException(
884 "k8cluster with helm-id : {} not found".format(cluster_uuid)
885 )
886
887 def _get_db_repos_dict(self, repo_ids: list):
888 db_repos_dict = {}
889 for repo_id in repo_ids:
890 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
891 db_repos_dict[db_repo["name"]] = db_repo
892 return db_repos_dict
893
894 """
895 ####################################################################################
896 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
897 ####################################################################################
898 """
899
900 @abc.abstractmethod
901 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
902 """
903 Creates and returns base cluster and kube dirs and returns them.
904 Also created helm3 dirs according to new directory specification, paths are
905 not returned but assigned to helm environment variables
906
907 :param cluster_name: cluster_name
908 :return: Dictionary with config_paths and dictionary with helm environment variables
909 """
910
911 @abc.abstractmethod
912 async def _cluster_init(self, cluster_id, namespace, paths, env):
913 """
914 Implements the helm version dependent cluster initialization
915 """
916
917 @abc.abstractmethod
918 async def _instances_list(self, cluster_id):
919 """
920 Implements the helm version dependent helm instances list
921 """
922
923 @abc.abstractmethod
924 async def _get_services(self, cluster_id, kdu_instance, namespace):
925 """
926 Implements the helm version dependent method to obtain services from a helm instance
927 """
928
929 @abc.abstractmethod
930 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
931 show_error_log: bool = False, return_text: bool = False):
932 """
933 Implements the helm version dependent method to obtain status of a helm instance
934 """
935
936 @abc.abstractmethod
937 def _get_install_command(self, kdu_model, kdu_instance, namespace,
938 params_str, version, atomic, timeout) -> str:
939 """
940 Obtain command to be executed to delete the indicated instance
941 """
942
943 @abc.abstractmethod
944 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
945 params_str, version, atomic, timeout) -> str:
946 """
947 Obtain command to be executed to upgrade the indicated instance
948 """
949
950 @abc.abstractmethod
951 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
952 """
953 Obtain command to be executed to rollback the indicated instance
954 """
955
956 @abc.abstractmethod
957 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
958 """
959 Obtain command to be executed to delete the indicated instance
960 """
961
962 @abc.abstractmethod
963 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
964 version: str):
965 """
966 Obtain command to be executed to obtain information about the kdu
967 """
968
969 @abc.abstractmethod
970 async def _uninstall_sw(self, cluster_id: str, namespace: str):
971 """
972 Method call to uninstall cluster software for helm. This method is dependent
973 of helm version
974 For Helm v2 it will be called when Tiller must be uninstalled
975 For Helm v3 it does nothing and does not need to be callled
976 """
977
978 """
979 ####################################################################################
980 ################################### P R I V A T E ##################################
981 ####################################################################################
982 """
983
984 @staticmethod
985 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
986 if os.path.exists(filename):
987 return True
988 else:
989 msg = "File {} does not exist".format(filename)
990 if exception_if_not_exists:
991 raise K8sException(msg)
992
993 @staticmethod
994 def _remove_multiple_spaces(strobj):
995 strobj = strobj.strip()
996 while " " in strobj:
997 strobj = strobj.replace(" ", " ")
998 return strobj
999
1000 @staticmethod
1001 def _output_to_lines(output: str) -> list:
1002 output_lines = list()
1003 lines = output.splitlines(keepends=False)
1004 for line in lines:
1005 line = line.strip()
1006 if len(line) > 0:
1007 output_lines.append(line)
1008 return output_lines
1009
1010 @staticmethod
1011 def _output_to_table(output: str) -> list:
1012 output_table = list()
1013 lines = output.splitlines(keepends=False)
1014 for line in lines:
1015 line = line.replace("\t", " ")
1016 line_list = list()
1017 output_table.append(line_list)
1018 cells = line.split(sep=" ")
1019 for cell in cells:
1020 cell = cell.strip()
1021 if len(cell) > 0:
1022 line_list.append(cell)
1023 return output_table
1024
1025 @staticmethod
1026 def _parse_services(output: str) -> list:
1027 lines = output.splitlines(keepends=False)
1028 services = []
1029 for line in lines:
1030 line = line.replace("\t", " ")
1031 cells = line.split(sep=" ")
1032 if len(cells) > 0 and cells[0].startswith("service/"):
1033 elems = cells[0].split(sep="/")
1034 if len(elems) > 1:
1035 services.append(elems[1])
1036 return services
1037
1038 @staticmethod
1039 def _get_deep(dictionary: dict, members: tuple):
1040 target = dictionary
1041 value = None
1042 try:
1043 for m in members:
1044 value = target.get(m)
1045 if not value:
1046 return None
1047 else:
1048 target = value
1049 except Exception:
1050 pass
1051 return value
1052
1053 # find key:value in several lines
1054 @staticmethod
1055 def _find_in_lines(p_lines: list, p_key: str) -> str:
1056 for line in p_lines:
1057 try:
1058 if line.startswith(p_key + ":"):
1059 parts = line.split(":")
1060 the_value = parts[1].strip()
1061 return the_value
1062 except Exception:
1063 # ignore it
1064 pass
1065 return None
1066
1067 @staticmethod
1068 def _lower_keys_list(input_list: list):
1069 """
1070 Transform the keys in a list of dictionaries to lower case and returns a new list
1071 of dictionaries
1072 """
1073 new_list = []
1074 for dictionary in input_list:
1075 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1076 new_list.append(new_dict)
1077 return new_list
1078
1079 def _local_exec(self, command: str) -> (str, int):
1080 command = self._remove_multiple_spaces(command)
1081 self.log.debug("Executing sync local command: {}".format(command))
1082 # raise exception if fails
1083 output = ""
1084 try:
1085 output = subprocess.check_output(
1086 command, shell=True, universal_newlines=True
1087 )
1088 return_code = 0
1089 self.log.debug(output)
1090 except Exception:
1091 return_code = 1
1092
1093 return output, return_code
1094
1095 async def _local_async_exec(
1096 self,
1097 command: str,
1098 raise_exception_on_error: bool = False,
1099 show_error_log: bool = True,
1100 encode_utf8: bool = False,
1101 env: dict = None
1102 ) -> (str, int):
1103
1104 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1105 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1106
1107 # split command
1108 command = shlex.split(command)
1109
1110 environ = os.environ.copy()
1111 if env:
1112 environ.update(env)
1113
1114 try:
1115 process = await asyncio.create_subprocess_exec(
1116 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1117 env=environ
1118 )
1119
1120 # wait for command terminate
1121 stdout, stderr = await process.communicate()
1122
1123 return_code = process.returncode
1124
1125 output = ""
1126 if stdout:
1127 output = stdout.decode("utf-8").strip()
1128 # output = stdout.decode()
1129 if stderr:
1130 output = stderr.decode("utf-8").strip()
1131 # output = stderr.decode()
1132
1133 if return_code != 0 and show_error_log:
1134 self.log.debug(
1135 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1136 )
1137 else:
1138 self.log.debug("Return code: {}".format(return_code))
1139
1140 if raise_exception_on_error and return_code != 0:
1141 raise K8sException(output)
1142
1143 if encode_utf8:
1144 output = output.encode("utf-8").strip()
1145 output = str(output).replace("\\n", "\n")
1146
1147 return output, return_code
1148
1149 except asyncio.CancelledError:
1150 raise
1151 except K8sException:
1152 raise
1153 except Exception as e:
1154 msg = "Exception executing command: {} -> {}".format(command, e)
1155 self.log.error(msg)
1156 if raise_exception_on_error:
1157 raise K8sException(e) from e
1158 else:
1159 return "", -1
1160
1161 async def _local_async_exec_pipe(self,
1162 command1: str,
1163 command2: str,
1164 raise_exception_on_error: bool = True,
1165 show_error_log: bool = True,
1166 encode_utf8: bool = False,
1167 env: dict = None):
1168
1169 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1170 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1171 command = "{} | {}".format(command1, command2)
1172 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1173
1174 # split command
1175 command1 = shlex.split(command1)
1176 command2 = shlex.split(command2)
1177
1178 environ = os.environ.copy()
1179 if env:
1180 environ.update(env)
1181
1182 try:
1183 read, write = os.pipe()
1184 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1185 os.close(write)
1186 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1187 stdout=asyncio.subprocess.PIPE,
1188 env=environ)
1189 os.close(read)
1190 stdout, stderr = await process_2.communicate()
1191
1192 return_code = process_2.returncode
1193
1194 output = ""
1195 if stdout:
1196 output = stdout.decode("utf-8").strip()
1197 # output = stdout.decode()
1198 if stderr:
1199 output = stderr.decode("utf-8").strip()
1200 # output = stderr.decode()
1201
1202 if return_code != 0 and show_error_log:
1203 self.log.debug(
1204 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1205 )
1206 else:
1207 self.log.debug("Return code: {}".format(return_code))
1208
1209 if raise_exception_on_error and return_code != 0:
1210 raise K8sException(output)
1211
1212 if encode_utf8:
1213 output = output.encode("utf-8").strip()
1214 output = str(output).replace("\\n", "\n")
1215
1216 return output, return_code
1217 except asyncio.CancelledError:
1218 raise
1219 except K8sException:
1220 raise
1221 except Exception as e:
1222 msg = "Exception executing command: {} -> {}".format(command, e)
1223 self.log.error(msg)
1224 if raise_exception_on_error:
1225 raise K8sException(e) from e
1226 else:
1227 return "", -1
1228
1229 async def _get_service(self, cluster_id, service_name, namespace):
1230 """
1231 Obtains the data of the specified service in the k8cluster.
1232
1233 :param cluster_id: id of a K8s cluster known by OSM
1234 :param service_name: name of the K8s service in the specified namespace
1235 :param namespace: K8s namespace used by the KDU instance
1236 :return: If successful, it will return a service with the following data:
1237 - `name` of the service
1238 - `type` type of service in the k8 cluster
1239 - `ports` List of ports offered by the service, for each port includes at least
1240 name, port, protocol
1241 - `cluster_ip` Internal ip to be used inside k8s cluster
1242 - `external_ip` List of external ips (in case they are available)
1243 """
1244
1245 # init config, env
1246 paths, env = self._init_paths_env(
1247 cluster_name=cluster_id, create_if_not_exist=True
1248 )
1249
1250 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1251 self.kubectl_command, paths["kube_config"], namespace, service_name
1252 )
1253
1254 output, _rc = await self._local_async_exec(
1255 command=command, raise_exception_on_error=True, env=env
1256 )
1257
1258 data = yaml.load(output, Loader=yaml.SafeLoader)
1259
1260 service = {
1261 "name": service_name,
1262 "type": self._get_deep(data, ("spec", "type")),
1263 "ports": self._get_deep(data, ("spec", "ports")),
1264 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1265 }
1266 if service["type"] == "LoadBalancer":
1267 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1268 ip_list = [elem["ip"] for elem in ip_map_list]
1269 service["external_ip"] = ip_list
1270
1271 return service
1272
1273 async def _exec_inspect_comand(
1274 self, inspect_command: str, kdu_model: str, repo_url: str = None
1275 ):
1276 """
1277 Obtains information about a kdu, no cluster (no env)
1278 """
1279
1280 repo_str = ""
1281 if repo_url:
1282 repo_str = " --repo {}".format(repo_url)
1283
1284 idx = kdu_model.find("/")
1285 if idx >= 0:
1286 idx += 1
1287 kdu_model = kdu_model[idx:]
1288
1289 version = ""
1290 if ":" in kdu_model:
1291 parts = kdu_model.split(sep=":")
1292 if len(parts) == 2:
1293 version = "--version {}".format(str(parts[1]))
1294 kdu_model = parts[0]
1295
1296 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1297 output, _rc = await self._local_async_exec(
1298 command=full_command, encode_utf8=True
1299 )
1300
1301 return output
1302
1303 async def _store_status(
1304 self,
1305 cluster_id: str,
1306 operation: str,
1307 kdu_instance: str,
1308 namespace: str = None,
1309 check_every: float = 10,
1310 db_dict: dict = None,
1311 run_once: bool = False,
1312 ):
1313 while True:
1314 try:
1315 await asyncio.sleep(check_every)
1316 detailed_status = await self._status_kdu(
1317 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1318 return_text=False
1319 )
1320 status = detailed_status.get("info").get("description")
1321 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1322 # write status to db
1323 result = await self.write_app_status_to_db(
1324 db_dict=db_dict,
1325 status=str(status),
1326 detailed_status=str(detailed_status),
1327 operation=operation,
1328 )
1329 if not result:
1330 self.log.info("Error writing in database. Task exiting...")
1331 return
1332 except asyncio.CancelledError:
1333 self.log.debug("Task cancelled")
1334 return
1335 except Exception as e:
1336 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1337 pass
1338 finally:
1339 if run_once:
1340 return
1341
1342 # params for use in -f file
1343 # returns values file option and filename (in order to delete it at the end)
1344 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1345
1346 if params and len(params) > 0:
1347 self._init_paths_env(
1348 cluster_name=cluster_id, create_if_not_exist=True
1349 )
1350
1351 def get_random_number():
1352 r = random.randrange(start=1, stop=99999999)
1353 s = str(r)
1354 while len(s) < 10:
1355 s = "0" + s
1356 return s
1357
1358 params2 = dict()
1359 for key in params:
1360 value = params.get(key)
1361 if "!!yaml" in str(value):
1362 value = yaml.load(value[7:])
1363 params2[key] = value
1364
1365 values_file = get_random_number() + ".yaml"
1366 with open(values_file, "w") as stream:
1367 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1368
1369 return "-f {}".format(values_file), values_file
1370
1371 return "", None
1372
1373 # params for use in --set option
1374 @staticmethod
1375 def _params_to_set_option(params: dict) -> str:
1376 params_str = ""
1377 if params and len(params) > 0:
1378 start = True
1379 for key in params:
1380 value = params.get(key, None)
1381 if value is not None:
1382 if start:
1383 params_str += "--set "
1384 start = False
1385 else:
1386 params_str += ","
1387 params_str += "{}={}".format(key, value)
1388 return params_str
1389
1390 @staticmethod
1391 def _generate_release_name(chart_name: str):
1392 # check embeded chart (file or dir)
1393 if chart_name.startswith("/"):
1394 # extract file or directory name
1395 chart_name = chart_name[chart_name.rfind("/") + 1:]
1396 # check URL
1397 elif "://" in chart_name:
1398 # extract last portion of URL
1399 chart_name = chart_name[chart_name.rfind("/") + 1:]
1400
1401 name = ""
1402 for c in chart_name:
1403 if c.isalpha() or c.isnumeric():
1404 name += c
1405 else:
1406 name += "-"
1407 if len(name) > 35:
1408 name = name[0:35]
1409
1410 # if does not start with alpha character, prefix 'a'
1411 if not name[0].isalpha():
1412 name = "a" + name
1413
1414 name += "-"
1415
1416 def get_random_number():
1417 r = random.randrange(start=1, stop=99999999)
1418 s = str(r)
1419 s = s.rjust(10, "0")
1420 return s
1421
1422 name = name + get_random_number()
1423 return name.lower()